Sierra Toolkit  Version of the Day
ParallelComm.cpp
1 /*------------------------------------------------------------------------*/
2 /* Copyright 2010 Sandia Corporation. */
3 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */
4 /* license for use of this work by or on behalf of the U.S. Government. */
5 /* Export of this program may require a license from the */
6 /* United States Government. */
7 /*------------------------------------------------------------------------*/
8 
9 #include <stdlib.h>
10 #include <stdexcept>
11 #include <sstream>
12 #include <vector>
13 
14 #include <stk_util/parallel/ParallelComm.hpp>
15 #include <stk_util/parallel/ParallelReduce.hpp>
16 
17 namespace stk_classic {
18 
19 //-----------------------------------------------------------------------
20 
21 #if defined( STK_HAS_MPI )
22 
23 enum { STK_MPI_TAG_SIZING = 0 , STK_MPI_TAG_DATA = 1 };
24 
25 // Communicate in sparse or dense mode, as directed during allocation
26 
27 namespace {
28 
29 bool all_to_all_dense( ParallelMachine p_comm ,
30  const CommBuffer * const send ,
31  const CommBuffer * const recv ,
32  std::ostream & msg )
33 {
34  typedef unsigned char * ucharp ;
35 
36  static const char method[] = "stk_classic::CommAll::communicate" ;
37 
38  int result ;
39 
40  {
41  const unsigned p_size = parallel_machine_size( p_comm );
42 
43  std::vector<int> tmp( p_size * 4 );
44 
45  int * const send_counts = (tmp.empty() ? NULL : & tmp[0]) ;
46  int * const send_displs = send_counts + p_size ;
47  int * const recv_counts = send_displs + p_size ;
48  int * const recv_displs = recv_counts + p_size ;
49 
50  unsigned char * const ps = static_cast<ucharp>(send[0].buffer());
51  unsigned char * const pr = static_cast<ucharp>(recv[0].buffer());
52 
53  for ( unsigned i = 0 ; i < p_size ; ++i ) {
54  const CommBuffer & send_buf = send[i] ;
55  const CommBuffer & recv_buf = recv[i] ;
56 
57  send_counts[i] = send_buf.capacity();
58  recv_counts[i] = recv_buf.capacity();
59 
60  send_displs[i] = static_cast<ucharp>(send_buf.buffer()) - ps ;
61  recv_displs[i] = static_cast<ucharp>(recv_buf.buffer()) - pr ;
62  }
63 
64  result = MPI_Alltoallv( ps , send_counts , send_displs , MPI_BYTE ,
65  pr , recv_counts , recv_displs , MPI_BYTE ,
66  p_comm );
67 
68  if ( MPI_SUCCESS != result ) {
69  msg << method << " GLOBAL ERROR: " << result << " == MPI_Alltoallv" ;
70  }
71  }
72 
73  return MPI_SUCCESS == result ;
74 }
75 
76 bool all_to_all_sparse( ParallelMachine p_comm ,
77  const CommBuffer * const send ,
78  const CommBuffer * const recv ,
79  std::ostream & msg )
80 {
81  static const char method[] = "stk_classic::CommAll::communicate" ;
82  static const int mpi_tag = STK_MPI_TAG_DATA ;
83 
84  int result = MPI_SUCCESS ;
85 
86  {
87  const unsigned p_size = parallel_machine_size( p_comm );
88  const unsigned p_rank = parallel_machine_rank( p_comm );
89 
90  //------------------------------
91  // Receive count
92 
93  unsigned num_recv = 0 ;
94 
95  for ( unsigned i = 0 ; i < p_size ; ++i ) {
96  if ( recv[i].capacity() ) { ++num_recv ; }
97  }
98 
99  //------------------------------
100  // Post receives for specific processors with specific sizes
101 
102  MPI_Request request_null = MPI_REQUEST_NULL ;
103  std::vector<MPI_Request> request( num_recv , request_null );
104  std::vector<MPI_Status> status( num_recv );
105 
106  unsigned count = 0 ;
107 
108  for ( unsigned i = 0 ; result == MPI_SUCCESS && i < p_size ; ++i ) {
109  const unsigned recv_size = recv[i].capacity();
110  void * const recv_buf = recv[i].buffer();
111  if ( recv_size ) {
112  result = MPI_Irecv( recv_buf , recv_size , MPI_BYTE ,
113  i , mpi_tag , p_comm , & request[count] );
114  ++count ;
115  }
116  }
117 
118  if ( MPI_SUCCESS != result ) {
119  msg << method << " LOCAL[" << p_rank << "] ERROR: "
120  << result << " == MPI_Irecv , " ;
121  }
122 
123  //------------------------------
124  // Sync to allow ready sends and for a potential error
125 
126  int local_error = MPI_SUCCESS == result ? 0 : 1 ;
127  int global_error = 0 ;
128 
129  result = MPI_Allreduce( & local_error , & global_error ,
130  1 , MPI_INT , MPI_SUM , p_comm );
131 
132  if ( MPI_SUCCESS != result ) {
133  msg << method << " GLOBAL ERROR: " << result << " == MPI_Allreduce" ;
134  }
135  else if ( global_error ) {
136  result = MPI_ERR_UNKNOWN ;
137  }
138  else {
139  // Everything is local from here on out, no more syncs
140 
141  //------------------------------
142  // Ready-send the buffers, rotate the send processor
143  // in a simple attempt to smooth out the communication traffic.
144 
145  for ( unsigned i = 0 ; MPI_SUCCESS == result && i < p_size ; ++i ) {
146  const int dst = ( i + p_rank ) % p_size ;
147  const unsigned send_size = send[dst].capacity();
148  void * const send_buf = send[dst].buffer();
149  if ( send_size ) {
150  result = MPI_Rsend( send_buf , send_size , MPI_BYTE ,
151  dst , mpi_tag , p_comm );
152  }
153  }
154 
155  if ( MPI_SUCCESS != result ) {
156  msg << method << " LOCAL ERROR: " << result << " == MPI_Rsend , " ;
157  }
158  else {
159  MPI_Request * const p_request = (request.empty() ? NULL : & request[0]) ;
160  MPI_Status * const p_status = (status.empty() ? NULL : & status[0]) ;
161 
162  result = MPI_Waitall( num_recv , p_request , p_status );
163  }
164 
165  if ( MPI_SUCCESS != result ) {
166  msg << method << " LOCAL[" << p_rank << "] ERROR: "
167  << result << " == MPI_Waitall , " ;
168  }
169  else {
170 
171  for ( unsigned i = 0 ; i < num_recv ; ++i ) {
172  MPI_Status * const recv_status = & status[i] ;
173  const int recv_proc = recv_status->MPI_SOURCE ;
174  const int recv_tag = recv_status->MPI_TAG ;
175  const int recv_plan = recv[recv_proc].capacity();
176  int recv_count = 0 ;
177 
178  MPI_Get_count( recv_status , MPI_BYTE , & recv_count );
179 
180  if ( recv_tag != mpi_tag || recv_count != recv_plan ) {
181  msg << method << " LOCAL[" << p_rank << "] ERROR: Recv["
182  << recv_proc << "] Size( "
183  << recv_count << " != " << recv_plan << " ) , " ;
184  result = MPI_ERR_UNKNOWN ;
185  }
186  }
187  }
188  }
189  }
190 
191  return MPI_SUCCESS == result ;
192 }
193 
194 }
195 
196 #else
197 
198 // Not parallel
199 
200 namespace {
201 
202 bool all_to_all_dense( ParallelMachine ,
203  const CommBuffer * const send ,
204  const CommBuffer * const recv ,
205  std::ostream & )
206 { return send == recv ; }
207 
208 bool all_to_all_sparse( ParallelMachine ,
209  const CommBuffer * const send ,
210  const CommBuffer * const recv ,
211  std::ostream & )
212 { return send == recv ; }
213 
214 }
215 
216 #endif
217 
218 //----------------------------------------------------------------------
219 
220 namespace {
221 
222 inline
223 size_t align_quad( size_t n )
224 {
225  enum { Size = 4 * sizeof(int) };
226  return n + CommBufferAlign<Size>::align(n);
227 }
228 
229 }
230 
231 //----------------------------------------------------------------------
232 
233 void CommBuffer::pack_overflow() const
234 {
235  std::ostringstream os ;
236  os << "stk_classic::CommBuffer::pack<T>(...){ overflow by " ;
237  os << remaining() ;
238  os << " bytes. }" ;
239  throw std::overflow_error( os.str() );
240 }
241 
242 void CommBuffer::unpack_overflow() const
243 {
244  std::ostringstream os ;
245  os << "stk_classic::CommBuffer::unpack<T>(...){ overflow by " ;
246  os << remaining();
247  os << " bytes. }" ;
248  throw std::overflow_error( os.str() );
249 }
250 
251 void CommAll::rank_error( const char * method , unsigned p ) const
252 {
253  std::ostringstream os ;
254  os << "stk_classic::CommAll::" << method
255  << "(" << p << ") ERROR: Not in [0:" << m_size << ")" ;
256  throw std::range_error( os.str() );
257 }
258 
259 //----------------------------------------------------------------------
260 
261 CommBuffer::CommBuffer()
262  : m_beg(NULL), m_ptr(NULL), m_end(NULL)
263 { }
264 
265 CommBuffer::~CommBuffer()
266 { }
267 
268 void CommBuffer::deallocate( const unsigned number , CommBuffer * buffers )
269 {
270  if ( NULL != buffers ) {
271  for ( unsigned i = 0 ; i < number ; ++i ) {
272  ( buffers + i )->~CommBuffer();
273  }
274  free( buffers );
275  }
276 }
277 
278 CommBuffer * CommBuffer::allocate(
279  const unsigned number , const unsigned * const size )
280 {
281  const size_t n_base = align_quad( number * sizeof(CommBuffer) );
282  size_t n_size = n_base ;
283 
284  if ( NULL != size ) {
285  for ( unsigned i = 0 ; i < number ; ++i ) {
286  n_size += align_quad( size[i] );
287  }
288  }
289 
290  // Allocate space for buffers
291 
292  void * const p_malloc = malloc( n_size );
293 
294  CommBuffer * const b_base =
295  p_malloc != NULL ? reinterpret_cast<CommBuffer*>(p_malloc)
296  : reinterpret_cast<CommBuffer*>( NULL );
297 
298  if ( p_malloc != NULL ) {
299 
300  for ( unsigned i = 0 ; i < number ; ++i ) {
301  new( b_base + i ) CommBuffer();
302  }
303 
304  if ( NULL != size ) {
305 
306  ucharp ptr = reinterpret_cast<ucharp>( p_malloc );
307 
308  ptr += n_base ;
309 
310  for ( unsigned i = 0 ; i < number ; ++i ) {
311  CommBuffer & b = b_base[i] ;
312  b.m_beg = ptr ;
313  b.m_ptr = ptr ;
314  b.m_end = ptr + size[i] ;
315  ptr += align_quad( size[i] );
316  }
317  }
318  }
319 
320  return b_base ;
321 }
322 
323 //----------------------------------------------------------------------
324 //----------------------------------------------------------------------
325 
326 CommAll::~CommAll()
327 {
328  try {
329  CommBuffer::deallocate( m_size , m_send );
330  if ( 1 < m_size ) { CommBuffer::deallocate( m_size , m_recv ); }
331  } catch(...){}
332  m_comm = parallel_machine_null();
333  m_size = 0 ;
334  m_rank = 0 ;
335  m_send = NULL ;
336  m_recv = NULL ;
337 }
338 
339 CommAll::CommAll()
340  : m_comm( parallel_machine_null() ),
341  m_size( 0 ), m_rank( 0 ),
342  m_bound( 0 ),
343  m_max( 0 ),
344  m_send(NULL),
345  m_recv(NULL)
346 {}
347 
348 CommAll::CommAll( ParallelMachine comm )
349  : m_comm( comm ),
350  m_size( parallel_machine_size( comm ) ),
351  m_rank( parallel_machine_rank( comm ) ),
352  m_bound( 0 ),
353  m_max( 0 ),
354  m_send(NULL),
355  m_recv(NULL)
356 {
357  m_send = CommBuffer::allocate( m_size , NULL );
358 
359  if ( NULL == m_send ) {
360  std::string msg("stk_classic::CommAll::CommAll FAILED malloc");
361  throw std::runtime_error(msg);
362  }
363 }
364 
365 bool CommAll::allocate_buffers( const unsigned num_msg_bounds ,
366  const bool symmetric ,
367  const bool local_flag )
368 {
369  const unsigned zero = 0 ;
370  std::vector<unsigned> tmp( m_size , zero );
371 
372  for ( unsigned i = 0 ; i < m_size ; ++i ) {
373  tmp[i] = m_send[i].size();
374  }
375 
376  const unsigned * const send_size = (tmp.empty() ? NULL : & tmp[0]) ;
377  const unsigned * const recv_size = symmetric ? (tmp.empty() ? NULL : & tmp[0]) : NULL ;
378 
379  return allocate_buffers( m_comm, num_msg_bounds,
380  send_size, recv_size, local_flag );
381 }
382 
383 //----------------------------------------------------------------------
384 
385 void CommAll::reset_buffers()
386 {
387  if ( m_send ) {
388  CommBuffer * m = m_send ;
389  CommBuffer * const me = m + m_size ;
390  for ( ; m != me ; ++m ) { m->reset(); }
391  }
392  if ( m_recv && 1 < m_size ) {
393  CommBuffer * m = m_recv ;
394  CommBuffer * const me = m + m_size ;
395  for ( ; m != me ; ++m ) { m->reset(); }
396  }
397 }
398 
399 //----------------------------------------------------------------------
400 
401 void CommAll::swap_send_recv()
402 {
403  if ( m_recv == NULL ) {
404  // ERROR
405  std::string
406  msg("stk_classic::CommAll::swap_send_recv(){ NULL recv buffers }" );
407  throw std::logic_error( msg );
408  }
409 
410  CommBuffer * tmp_msg = m_send ;
411  m_send = m_recv ;
412  m_recv = tmp_msg ;
413 }
414 
415 //----------------------------------------------------------------------
416 
417 bool CommAll::allocate_buffers( ParallelMachine comm ,
418  const unsigned num_msg_bounds ,
419  const unsigned * const send_size ,
420  const unsigned * const recv_size ,
421  const bool local_flag )
422 {
423  static const char method[] = "stk_classic::CommAll::allocate_buffers" ;
424  const unsigned uzero = 0 ;
425 
426  CommBuffer::deallocate( m_size , m_send );
427  CommBuffer::deallocate( m_size , m_recv );
428 
429  m_comm = comm ;
430  m_size = parallel_machine_size( comm );
431  m_rank = parallel_machine_rank( comm );
432  m_bound = num_msg_bounds ;
433 
434  std::ostringstream msg ;
435 
436  //--------------------------------
437  // Buffer allocation
438 
439  {
440  const bool send_none = NULL == send_size ;
441 
442  std::vector<unsigned> tmp_send ;
443 
444  if ( send_none ) { tmp_send.resize( m_size , uzero ); }
445 
446  const unsigned * const send = send_none ? (tmp_send.empty() ? NULL : & tmp_send[0]) : send_size ;
447 
448  m_send = CommBuffer::allocate( m_size , send );
449 
450  if ( 1 < m_size ) {
451 
452  std::vector<unsigned> tmp_recv ;
453 
454  const bool recv_tbd = NULL == recv_size ;
455 
456  if ( recv_tbd ) { // Had better be globally consistent.
457 
458  tmp_recv.resize( m_size , uzero );
459 
460  unsigned * const r = (tmp_recv.empty() ? NULL : & tmp_recv[0]) ;
461 
462  comm_sizes( m_comm , m_bound , m_max , send , r );
463  }
464 
465  const unsigned * const recv = recv_tbd ? (tmp_recv.empty() ? NULL : & tmp_recv[0]) : recv_size ;
466 
467  m_recv = CommBuffer::allocate( m_size , recv );
468  }
469  else {
470  m_recv = m_send ;
471  }
472  }
473 
474  bool error_alloc = m_send == NULL || m_recv == NULL ;
475 
476  //--------------------------------
477  // Propogation of error flag, input flag, and quick/cheap/approximate
478  // verification of send and receive messages.
479  // Is the number and total size of messages consistent?
480  // Sum message counts and sizes for grouped processors.
481  // Sent are positive and received are negative.
482  // Should finish with all total counts of zero.
483 
484  enum { NPSum = 7 };
485  enum { Length = 2 + 2 * NPSum };
486 
487  int local_result[ Length ];
488  int global_result[ Length ];
489 
490  Copy<Length>( local_result , 0 );
491 
492  local_result[ Length - 2 ] = error_alloc ;
493  local_result[ Length - 1 ] = local_flag ;
494 
495  if ( ! error_alloc ) {
496 
497  const unsigned r = 2 * ( m_rank % NPSum );
498 
499  for ( unsigned i = 0 ; i < m_size ; ++i ) {
500  const unsigned n_send = m_send[i].capacity();
501  const unsigned n_recv = m_recv[i].capacity();
502 
503  const unsigned s = 2 * ( i % NPSum );
504 
505  local_result[s] += n_send ? 1 : 0 ;
506  local_result[s+1] += n_send ;
507 
508  local_result[r] -= n_recv ? 1 : 0 ;
509  local_result[r+1] -= n_recv ;
510  }
511  }
512 
513  if (m_size > 1) {
514  all_reduce_sum( m_comm , local_result , global_result , Length );
515  }
516  else {
517  Copy<Length>(global_result, local_result);
518  }
519 
520  bool global_flag ;
521 
522  error_alloc = global_result[ Length - 2 ] ;
523  global_flag = global_result[ Length - 1 ] ;
524 
525  bool ok = true ;
526 
527  for ( unsigned i = 0 ; ok && i < 2 * NPSum ; ++i ) {
528  ok = 0 == global_result[i] ;
529  }
530 
531  if ( error_alloc || ! ok ) {
532  msg << method << " ERROR:" ;
533  if ( error_alloc ) { msg << " Failed memory allocation ," ; }
534  if ( ! ok ) { msg << " Parallel inconsistent send/receive ," ; }
535  throw std::runtime_error( msg.str() );
536  }
537 
538  return global_flag ;
539 }
540 
541 //----------------------------------------------------------------------
542 
543 void CommAll::communicate()
544 {
545  static const char method[] = "stk_classic::CommAll::communicate" ;
546 
547  std::ostringstream msg ;
548 
549  // Verify the send buffers have been filled, reset the buffer pointers
550 
551  for ( unsigned i = 0 ; i < m_size ; ++i ) {
552 
553  if ( m_send[i].remaining() ) {
554  msg << method << " LOCAL[" << m_rank << "] ERROR: Send[" << i
555  << "] Buffer not filled." ;
556  throw std::underflow_error( msg.str() );
557  }
558 /*
559  m_send[i].reset();
560 */
561  m_recv[i].reset();
562  }
563 
564  if ( 1 < m_size ) {
565  bool ok ;
566 
567  if ( m_bound < m_max ) {
568  ok = all_to_all_dense( m_comm , m_send , m_recv , msg );
569  }
570  else {
571  ok = all_to_all_sparse( m_comm , m_send , m_recv , msg );
572  }
573 
574  if ( ! ok ) { throw std::runtime_error( msg.str() ); }
575  }
576 }
577 
578 //----------------------------------------------------------------------
579 //----------------------------------------------------------------------
580 
581 CommBroadcast::CommBroadcast( ParallelMachine comm , unsigned root_rank )
582  : m_comm( comm ),
583  m_size( parallel_machine_size( comm ) ),
584  m_rank( parallel_machine_rank( comm ) ),
585  m_root_rank( root_rank ),
586  m_buffer()
587 {}
588 
589 bool CommBroadcast::allocate_buffer( const bool local_flag )
590 {
591  static const char method[] = "stk_classic::CommBroadcast::allocate_buffer" ;
592 
593  unsigned root_rank_min = m_root_rank ;
594  unsigned root_rank_max = m_root_rank ;
595  unsigned root_send_size = m_root_rank == m_rank ? m_buffer.size() : 0 ;
596  unsigned flag = local_flag ;
597 
598  all_reduce( m_comm , ReduceMin<1>( & root_rank_min ) &
599  ReduceMax<1>( & root_rank_max ) &
600  ReduceMax<1>( & root_send_size ) &
601  ReduceBitOr<1>( & flag ) );
602 
603  if ( root_rank_min != root_rank_max ) {
604  std::string msg ;
605  msg.append( method );
606  msg.append( " FAILED: inconsistent root processor" );
607  throw std::runtime_error( msg );
608  }
609 
610  m_buffer.m_beg = static_cast<CommBuffer::ucharp>( malloc( root_send_size ) );
611  m_buffer.m_ptr = m_buffer.m_beg ;
612  m_buffer.m_end = m_buffer.m_beg + root_send_size ;
613 
614  return flag ;
615 }
616 
617 CommBroadcast::~CommBroadcast()
618 {
619  try {
620  if ( m_buffer.m_beg ) { free( static_cast<void*>( m_buffer.m_beg ) ); }
621  } catch(...) {}
622  m_buffer.m_beg = NULL ;
623  m_buffer.m_ptr = NULL ;
624  m_buffer.m_end = NULL ;
625 }
626 
627 CommBuffer & CommBroadcast::recv_buffer()
628 {
629  return m_buffer ;
630 }
631 
632 CommBuffer & CommBroadcast::send_buffer()
633 {
634  static const char method[] = "stk_classic::CommBroadcast::send_buffer" ;
635 
636  if ( m_root_rank != m_rank ) {
637  std::string msg ;
638  msg.append( method );
639  msg.append( " FAILED: is not root processor" );
640  throw std::runtime_error( msg );
641  }
642 
643  return m_buffer ;
644 }
645 
646 void CommBroadcast::communicate()
647 {
648 #if defined( STK_HAS_MPI )
649  {
650  const int count = m_buffer.capacity();
651  void * const buf = m_buffer.buffer();
652 
653  const int result = MPI_Bcast( buf, count, MPI_BYTE, m_root_rank, m_comm);
654 
655  if ( MPI_SUCCESS != result ) {
656  std::ostringstream msg ;
657  msg << "stk_classic::CommBroadcast::communicate ERROR : "
658  << result << " == MPI_Bcast" ;
659  throw std::runtime_error( msg.str() );
660  }
661  }
662 #endif
663 
664  m_buffer.reset();
665 }
666 
667 //----------------------------------------------------------------------
668 //----------------------------------------------------------------------
669 
670 CommGather::~CommGather()
671 {
672  try {
673  free( static_cast<void*>( m_send.m_beg ) );
674 
675  if ( NULL != m_recv_count ) { free( static_cast<void*>( m_recv_count ) ); }
676 
677  if ( NULL != m_recv ) { CommBuffer::deallocate( m_size , m_recv ); }
678  } catch(...){}
679 }
680 
681 void CommGather::reset()
682 {
683  m_send.reset();
684 
685  if ( NULL != m_recv ) {
686  for ( unsigned i = 0 ; i < m_size ; ++i ) { m_recv[i].reset(); }
687  }
688 }
689 
690 CommBuffer & CommGather::recv_buffer( unsigned p )
691 {
692  static CommBuffer empty ;
693 
694  return m_size <= p ? empty : (
695  m_size <= 1 ? m_send : m_recv[p] );
696 }
697 
698 //----------------------------------------------------------------------
699 
700 CommGather::CommGather( ParallelMachine comm ,
701  unsigned root_rank , unsigned send_size )
702  : m_comm( comm ),
703  m_size( parallel_machine_size( comm ) ),
704  m_rank( parallel_machine_rank( comm ) ),
705  m_root_rank( root_rank ),
706  m_send(),
707  m_recv(NULL),
708  m_recv_count(NULL),
709  m_recv_displ(NULL)
710 {
711  m_send.m_beg = static_cast<CommBuffer::ucharp>( malloc( send_size ) );
712  m_send.m_ptr = m_send.m_beg ;
713  m_send.m_end = m_send.m_beg + send_size ;
714 
715 #if defined( STK_HAS_MPI )
716 
717  if ( 1 < m_size ) {
718 
719  const bool is_root = m_rank == m_root_rank ;
720 
721  if ( is_root ) {
722  m_recv_count = static_cast<int*>( malloc(2*m_size*sizeof(int)) );
723  m_recv_displ = m_recv_count + m_size ;
724  }
725 
726  MPI_Gather( & send_size , 1 , MPI_INT ,
727  m_recv_count , 1 , MPI_INT ,
728  m_root_rank , m_comm );
729 
730  if ( is_root ) {
731  m_recv = CommBuffer::allocate( m_size ,
732  reinterpret_cast<unsigned*>( m_recv_count ) );
733 
734  for ( unsigned i = 0 ; i < m_size ; ++i ) {
735  m_recv_displ[i] = m_recv[i].m_beg - m_recv[0].m_beg ;
736  }
737  }
738  }
739 
740 #endif
741 
742 }
743 
744 
745 void CommGather::communicate()
746 {
747 #if defined( STK_HAS_MPI )
748 
749  if ( 1 < m_size ) {
750 
751  const int send_count = m_send.capacity();
752 
753  void * const send_buf = m_send.buffer();
754  void * const recv_buf = m_rank == m_root_rank ? m_recv->buffer() : NULL ;
755 
756  MPI_Gatherv( send_buf , send_count , MPI_BYTE ,
757  recv_buf , m_recv_count , m_recv_displ , MPI_BYTE ,
758  m_root_rank , m_comm );
759  }
760 
761 #endif
762 
763  reset();
764 }
765 
766 //----------------------------------------------------------------------
767 //----------------------------------------------------------------------
768 
769 #if defined( STK_HAS_MPI )
770 
772  const unsigned * const send_size ,
773  unsigned * const recv_size ,
774  bool local_flag )
775 {
776  static const char method[] = "stk_classic::comm_dense_sizes" ;
777 
778  const unsigned zero = 0 ;
779  const unsigned p_size = parallel_machine_size( comm );
780 
781  std::vector<unsigned> send_buf( p_size * 2 , zero );
782  std::vector<unsigned> recv_buf( p_size * 2 , zero );
783 
784  for ( unsigned i = 0 ; i < p_size ; ++i ) {
785  const unsigned i2 = i * 2 ;
786  send_buf[i2] = send_size[i] ;
787  send_buf[i2+1] = local_flag ;
788  }
789 
790  {
791  unsigned * const ps = (send_buf.empty() ? NULL : & send_buf[0]) ;
792  unsigned * const pr = (recv_buf.empty() ? NULL : & recv_buf[0]) ;
793  const int result =
794  MPI_Alltoall( ps , 2 , MPI_UNSIGNED , pr , 2 , MPI_UNSIGNED , comm );
795 
796  if ( MPI_SUCCESS != result ) {
797  std::string msg ;
798  msg.append( method );
799  msg.append( " FAILED: MPI_SUCCESS != MPI_Alltoall" );
800  throw std::runtime_error( msg );
801  }
802  }
803 
804  bool global_flag = false ;
805 
806  for ( unsigned i = 0 ; i < p_size ; ++i ) {
807  const unsigned i2 = i * 2 ;
808  recv_size[i] = recv_buf[i2] ;
809  if ( recv_buf[i2+1] ) { global_flag = true ; }
810  }
811 
812  return global_flag ;
813 }
814 
815 //----------------------------------------------------------------------
816 
817 namespace {
818 
819 extern "C" {
820 
821 void sum_np_max_2_op(
822  void * inv , void * outv , int * len , ParallelDatatype * )
823 {
824  const int np = *len - 2 ;
825  unsigned * ind = (unsigned *) inv ;
826  unsigned * outd = (unsigned *) outv ;
827 
828  // Sum all but the last two
829  // the last two are maximum
830 
831  for ( int i = 0 ; i < np ; ++i ) {
832  *outd += *ind ;
833  ++outd ;
834  ++ind ;
835  }
836  if ( outd[0] < ind[0] ) { outd[0] = ind[0] ; }
837  if ( outd[1] < ind[1] ) { outd[1] = ind[1] ; }
838 }
839 
840 }
841 
842 }
843 
845  const unsigned num_msg_bound ,
846  unsigned & num_msg_maximum ,
847  const unsigned * const send_size ,
848  unsigned * const recv_size ,
849  bool local_flag )
850 {
851  static const char method[] = "stk_classic::comm_unknown_sizes" ;
852  const unsigned uzero = 0 ;
853 
854  static MPI_Op mpi_op = MPI_OP_NULL ;
855 
856  if ( mpi_op == MPI_OP_NULL ) {
857  // Is fully commutative
858  MPI_Op_create( sum_np_max_2_op , 1 , & mpi_op );
859  }
860 
861  const unsigned p_size = parallel_machine_size( comm );
862  const unsigned p_rank = parallel_machine_rank( comm );
863 
864  int result ;
865 
866  std::ostringstream msg ;
867 
868  num_msg_maximum = 0 ;
869 
870  unsigned num_recv = 0 ;
871  unsigned max_msg = 0 ;
872  bool global_flag = false ;
873 
874  {
875  std::vector<unsigned> send_buf( p_size + 2 , uzero );
876  std::vector<unsigned> recv_buf( p_size + 2 , uzero );
877 
878  unsigned * const p_send = (send_buf.empty() ? NULL : & send_buf[0]) ;
879  unsigned * const p_recv = (recv_buf.empty() ? NULL : & recv_buf[0]) ;
880 
881  for ( unsigned i = 0 ; i < p_size ; ++i ) {
882  recv_size[i] = 0 ; // Zero output
883  if ( send_size[i] ) {
884  send_buf[i] = 1 ;
885  ++max_msg ;
886  }
887  }
888  send_buf[p_size] = max_msg ;
889  send_buf[p_size+1] = local_flag ;
890 
891  result = MPI_Allreduce(p_send,p_recv,p_size+2,MPI_UNSIGNED,mpi_op,comm);
892 
893  if ( result != MPI_SUCCESS ) {
894  // PARALLEL ERROR
895  msg << method << " ERROR: " << result << " == MPI_AllReduce" ;
896  throw std::runtime_error( msg.str() );
897  }
898 
899  num_recv = recv_buf[ p_rank ] ;
900  max_msg = recv_buf[ p_size ] ;
901  global_flag = recv_buf[ p_size + 1 ] ;
902 
903  // max_msg is now the maximum send count,
904  // Loop over receive counts to determine
905  // if a receive count is larger.
906 
907  for ( unsigned i = 0 ; i < p_size ; ++i ) {
908  if ( max_msg < recv_buf[i] ) { max_msg = recv_buf[i] ; }
909  }
910  }
911 
912  num_msg_maximum = max_msg ;
913 
914  if ( num_msg_bound < max_msg ) {
915  // Dense, pay for an all-to-all
916 
917  result =
918  MPI_Alltoall( (void*) send_size , 1 , MPI_UNSIGNED ,
919  recv_size , 1 , MPI_UNSIGNED , comm );
920 
921  if ( MPI_SUCCESS != result ) {
922  // LOCAL ERROR ?
923  msg << method << " ERROR: " << result << " == MPI_Alltoall" ;
924  throw std::runtime_error( msg.str() );
925  }
926  }
927  else if ( max_msg ) {
928  // Sparse, just do point-to-point
929 
930  const int mpi_tag = STK_MPI_TAG_SIZING ;
931 
932  MPI_Request request_null = MPI_REQUEST_NULL ;
933  std::vector<MPI_Request> request( num_recv , request_null );
934  std::vector<MPI_Status> status( num_recv );
935  std::vector<unsigned> buf( num_recv );
936 
937  // Post receives for point-to-point message sizes
938 
939  for ( unsigned i = 0 ; i < num_recv ; ++i ) {
940  unsigned * const p_buf = & buf[i] ;
941  MPI_Request * const p_request = & request[i] ;
942  result = MPI_Irecv( p_buf , 1 , MPI_UNSIGNED ,
943  MPI_ANY_SOURCE , mpi_tag , comm , p_request );
944  if ( MPI_SUCCESS != result ) {
945  // LOCAL ERROR
946  msg << method << " ERROR: " << result << " == MPI_Irecv" ;
947  throw std::runtime_error( msg.str() );
948  }
949  }
950 
951  // Send the point-to-point message sizes,
952  // rotate the sends in an attempt to balance the message traffic.
953 
954  for ( unsigned i = 0 ; i < p_size ; ++i ) {
955  int dst = ( i + p_rank ) % p_size ;
956  unsigned value = send_size[dst] ;
957  if ( value ) {
958  result = MPI_Send( & value , 1 , MPI_UNSIGNED , dst , mpi_tag , comm );
959  if ( MPI_SUCCESS != result ) {
960  // LOCAL ERROR
961  msg << method << " ERROR: " << result << " == MPI_Send" ;
962  throw std::runtime_error( msg.str() );
963  }
964  }
965  }
966 
967  // Wait for all receives
968 
969  {
970  MPI_Request * const p_request = (request.empty() ? NULL : & request[0]) ;
971  MPI_Status * const p_status = (status.empty() ? NULL : & status[0]) ;
972  result = MPI_Waitall( num_recv , p_request , p_status );
973  }
974  if ( MPI_SUCCESS != result ) {
975  // LOCAL ERROR ?
976  msg << method << " ERROR: " << result << " == MPI_Waitall" ;
977  throw std::runtime_error( msg.str() );
978  }
979 
980  // Set the receive message sizes
981 
982  for ( unsigned i = 0 ; i < num_recv ; ++i ) {
983  MPI_Status * const recv_status = & status[i] ;
984  const int recv_proc = recv_status->MPI_SOURCE ;
985  const int recv_tag = recv_status->MPI_TAG ;
986  int recv_count = 0 ;
987 
988  MPI_Get_count( recv_status , MPI_UNSIGNED , & recv_count );
989 
990  if ( recv_tag != mpi_tag || recv_count != 1 ) {
991  msg << method << " ERROR: Received buffer mismatch " ;
992  msg << "P" << p_rank << " <- P" << recv_proc ;
993  msg << " " << 1 << " != " << recv_count ;
994  throw std::runtime_error( msg.str() );
995  }
996 
997  const unsigned r_size = buf[i] ;
998  recv_size[ recv_proc ] = r_size ;
999  }
1000  }
1001 
1002  return global_flag ;
1003 }
1004 
1005 //----------------------------------------------------------------------
1006 //----------------------------------------------------------------------
1007 
1008 #else
1009 
1010 
1011 bool comm_sizes( ParallelMachine ,
1012  const unsigned ,
1013  unsigned & num_msg_maximum ,
1014  const unsigned * const send_size ,
1015  unsigned * const recv_size ,
1016  bool local_flag )
1017 {
1018  num_msg_maximum = send_size[0] ? 1 : 0 ;
1019 
1020  recv_size[0] = send_size[0] ;
1021 
1022  return local_flag ;
1023 }
1024 
1026  const unsigned * const send_size ,
1027  unsigned * const recv_size ,
1028  bool local_flag )
1029 {
1030  recv_size[0] = send_size[0] ;
1031 
1032  return local_flag ;
1033 }
1034 
1035 //----------------------------------------------------------------------
1036 
1037 #endif
1038 
1039 }
1040 
bool comm_dense_sizes(ParallelMachine comm, const unsigned *const send_size, unsigned *const recv_size, bool local_flag)
void all_reduce_sum(ParallelMachine comm, const double *local, double *global, unsigned count)
Parallel summation to all processors.
unsigned parallel_machine_rank(ParallelMachine parallel_machine)
Member function parallel_machine_rank ...
Definition: Parallel.cpp:29
ParallelMachine parallel_machine_null()
parallel_machine_null returns MPI_COMM_NULL if MPI is enabled.
Definition: Parallel.hpp:43
unsigned parallel_machine_size(ParallelMachine parallel_machine)
Member function parallel_machine_size ...
Definition: Parallel.cpp:18
bool comm_sizes(ParallelMachine comm, const unsigned num_msg_bound, unsigned &num_msg_maximum, const unsigned *const send_size, unsigned *const recv_size, bool local_flag)
Sierra Toolkit.
MPI_Comm ParallelMachine
Definition: Parallel.hpp:32
MPI_Datatype ParallelDatatype
Definition: Parallel.hpp:36
void reset(MPI_Comm new_comm)
Function reset determines new parallel_size and parallel_rank. Flushes, closes, and reopens log files...
Definition: Env.cpp:1067
void all_reduce(ParallelMachine, const ReduceOp &)
eastl::iterator_traits< InputIterator >::difference_type count(InputIterator first, InputIterator last, const T &value)