Epetra Package Browser (Single Doxygen Collection)  Development
Epetra_MpiDistributor.cpp
Go to the documentation of this file.
1 //@HEADER
2 // ************************************************************************
3 //
4 // Epetra: Linear Algebra Services Package
5 // Copyright 2011 Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 //@HEADER
41 
42 #include "Epetra_MpiDistributor.h"
43 #include "Epetra_MpiComm.h"
44 
45 #include <stdexcept>
46 #include <vector>
47 
48 //==============================================================================
50  Epetra_Object("Epetra::MpiDistributor"),
51  lengths_to_(0),
52  procs_to_(0),
53  indices_to_(0),
54  size_indices_to_(0),
55  lengths_from_(0),
56  procs_from_(0),
57  indices_from_(0),
58  size_indices_from_(0),
59  resized_(false),
60  sizes_(0),
61  sizes_to_(0),
62  starts_to_(0),
63  starts_to_ptr_(0),
64  indices_to_ptr_(0),
65  sizes_from_(0),
66  starts_from_(0),
67  starts_from_ptr_(0),
68  indices_from_ptr_(0),
69  nrecvs_(0),
70  nsends_(0),
71  nexports_(0),
72  self_msg_(0),
73  max_send_length_(0),
74  total_recv_length_(0),
75  tag_(Comm.GetMpiTag()),
76  epComm_(&Comm),
77  comm_(Comm.GetMpiComm()),
78  request_(0),
79  status_(0),
80  no_delete_(false),
81  send_array_(0),
82  send_array_size_(0),
83  comm_plan_reverse_(0)
84 {
85 }
86 
87 //==============================================================================
89  Epetra_Object("Epetra::MpiDistributor"),
90  lengths_to_(0),
91  procs_to_(0),
92  indices_to_(0),
93  size_indices_to_(Distributor.size_indices_to_),
94  lengths_from_(0),
95  procs_from_(0),
96  indices_from_(0),
97  size_indices_from_(Distributor.size_indices_from_),
98  resized_(false),
99  sizes_(0),
100  sizes_to_(0),
101  starts_to_(0),
102  starts_to_ptr_(0),
103  indices_to_ptr_(0),
104  sizes_from_(0),
105  starts_from_(0),
106  starts_from_ptr_(0),
107  indices_from_ptr_(0),
108  nrecvs_(Distributor.nrecvs_),
109  nsends_(Distributor.nsends_),
110  nexports_(Distributor.nexports_),
111  self_msg_(Distributor.self_msg_),
112  max_send_length_(Distributor.max_send_length_),
113  total_recv_length_(Distributor.total_recv_length_),
114  tag_(Distributor.tag_),
115  epComm_(Distributor.epComm_),
116  comm_(Distributor.comm_),
117  request_(0),
118  status_(0),
119  no_delete_(Distributor.no_delete_),
120  send_array_(0),
121  send_array_size_(0),
122  comm_plan_reverse_(0)
123 {
124  //CMS
125  int i;
126  if (nsends_>0) {
127  lengths_to_ = new int[nsends_];
128  procs_to_ = new int[nsends_];
129  starts_to_ = new int[nsends_];
130  for (i=0; i<nsends_; i++) {
131  lengths_to_[i] = Distributor.lengths_to_[i];
132  procs_to_[i] = Distributor.procs_to_[i];
133  starts_to_[i] = Distributor.starts_to_[i];
134  }
135  }
136  if (size_indices_to_>0) {
137  indices_to_ = new int[size_indices_to_];
138  for (i=0; i<size_indices_to_; i++) {
139  indices_to_[i] = Distributor.indices_to_[i];
140  }
141  indices_to_ptr_ = new int[nexports_];
142  for (i=0; i<nexports_; i++) {
143  indices_to_ptr_[i] = Distributor.indices_to_ptr_[i];
144  }
145  }
146 
147  if( nsends_+self_msg_ > 0 ) {
148  if(Distributor.sizes_to_) sizes_to_ = new int[nsends_+self_msg_];
149  if(Distributor.starts_to_ptr_) starts_to_ptr_ = new int[nsends_+self_msg_];
150  for(i=0;i<nsends_+self_msg_; i++) {
151  if(Distributor.sizes_to_) sizes_to_[i] = Distributor.sizes_to_[i];
152  if(Distributor.starts_to_ptr_) starts_to_ptr_[i] = Distributor.starts_to_ptr_[i];
153  }
154  }
155 
156  if (nrecvs_>0) {
157  lengths_from_ = new int[nrecvs_];
158  procs_from_ = new int[nrecvs_];
159  starts_from_ = new int[nrecvs_];
160  request_ = new MPI_Request[ nrecvs_ ];
161  status_ = new MPI_Status[ nrecvs_ ];
162  for (i=0; i<nrecvs_; i++) {
163  lengths_from_[i] = Distributor.lengths_from_[i];
164  procs_from_[i] = Distributor.procs_from_[i];
165  starts_from_[i] = Distributor.starts_from_[i];
166  }
167  }
168 
169  if (size_indices_from_>0) {
171  for (i=0; i<size_indices_from_; i++) {
172  indices_from_[i] = Distributor.indices_from_[i];
173  }
174  }
175 
176  if( nrecvs_+self_msg_ > 0 ) {
177  if(Distributor.sizes_from_) sizes_from_ = new int [nrecvs_+self_msg_];
178  if(Distributor.starts_from_ptr_) starts_from_ptr_ = new int [nrecvs_+self_msg_];
179  for(i=0;i<nrecvs_+self_msg_; i++) {
180  if(Distributor.sizes_from_) sizes_from_[i] = Distributor.sizes_from_[i];
181  if(Distributor.starts_from_ptr_) starts_from_ptr_[i] = Distributor.starts_from_ptr_[i];
182  }
183  }
184 
185  // Note: indices_from_ptr_ is always length zero...
186 
187 }
188 
189 //==============================================================================
191 {
192  if( !no_delete_ )
193  {
194  if( lengths_to_ != 0 ) delete [] lengths_to_;
195  if( procs_to_ != 0 ) delete [] procs_to_;
196  if( indices_to_ != 0 ) delete [] indices_to_;
197  if( lengths_from_ != 0 ) delete [] lengths_from_;
198  if( procs_from_ != 0 ) delete [] procs_from_;
199  if( indices_from_ != 0 ) delete [] indices_from_;
200  if( starts_to_ != 0 ) delete [] starts_to_;
201  if( starts_from_ != 0 ) delete [] starts_from_;
202  }
203 
204  if( sizes_ != 0 ) delete [] sizes_;
205  if( sizes_to_ != 0 ) delete [] sizes_to_;
206  if( sizes_from_ != 0 ) delete [] sizes_from_;
207  if( starts_to_ptr_ != 0 ) delete [] starts_to_ptr_;
208  if( starts_from_ptr_ != 0 ) delete [] starts_from_ptr_;
209  if( indices_to_ptr_ != 0 ) delete [] indices_to_ptr_;
210  if( indices_from_ptr_ != 0 ) delete [] indices_from_ptr_;
211 
212  if( request_ != 0 ) delete [] request_;
213  if( status_ != 0 ) delete [] status_;
214 
215  if( send_array_size_ != 0 ) { delete [] send_array_; send_array_size_ = 0; }
216 
217  if( comm_plan_reverse_ != 0 ) delete comm_plan_reverse_;
218 }
219 
220 
221 //==============================================================================
222 int Epetra_MpiDistributor::CreateFromSends( const int & NumExportIDs,
223  const int * ExportPIDs,
224  bool Deterministic,
225  int & NumRemoteIDs )
226 {
227  (void)Deterministic; // Prevent compiler warnings for unused argument.
228  int my_proc;
229  MPI_Comm_rank( comm_, &my_proc );
230 
231  int nprocs;
232  MPI_Comm_size( comm_, &nprocs );
233 
234  // Do the forward map component
235  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
236 
237  //Invert map to see what msgs are received and what length
238  EPETRA_CHK_ERR( ComputeRecvs_( my_proc, nprocs ) );
239 
240  if (nrecvs_>0) {
241  if( !request_ ) {
242  request_ = new MPI_Request[ nrecvs_ ];
243  status_ = new MPI_Status[ nrecvs_ ];
244  }
245  }
246 
247  NumRemoteIDs = total_recv_length_;
248 
249  return 0;
250 }
251 
252 //==============================================================================
253 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
254  const int * RemoteGIDs,
255  const int * RemotePIDs,
256  bool Deterministic,
257  int & NumExportIDs,
258  int *& ExportGIDs,
259  int *& ExportPIDs )
260 {
261  int my_proc;
262  MPI_Comm_rank( comm_, &my_proc );
263 
264  int nprocs;
265  MPI_Comm_size( comm_, &nprocs );
266 
267  EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
268  ExportGIDs, ExportPIDs, my_proc) );
269 
270  int testNumRemoteIDs;
271  EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
272  Deterministic, testNumRemoteIDs ) );
273 
274  return(0);
275 }
276 
277 //==============================================================================
278 //---------------------------------------------------------------------------
279 //CreateFromRecvs Method
280 // - create communication plan given a known list of procs to recv from
281 //---------------------------------------------------------------------------
282 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
283 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
284  const long long * RemoteGIDs,
285  const int * RemotePIDs,
286  bool Deterministic,
287  int & NumExportIDs,
288  long long *& ExportGIDs,
289  int *& ExportPIDs )
290 {
291  int my_proc;
292  MPI_Comm_rank( comm_, &my_proc );
293 
294  int nprocs;
295  MPI_Comm_size( comm_, &nprocs );
296 
297  EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
298  ExportGIDs, ExportPIDs, my_proc) );
299 
300  int testNumRemoteIDs;
301  EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
302  Deterministic, testNumRemoteIDs ) );
303 
304  return(0);
305 }
306 #endif
307 
308 
309 
310 //==============================================================================
311 //---------------------------------------------------------------------------
312 //CreateFromSendsAndRecvs Method
313 //---------------------------------------------------------------------------
315  const int * ExportPIDs,
316  const int & NumRemoteIDs,
317  const int * RemoteGIDs,
318  const int * RemotePIDs,
319  bool Deterministic)
320 {
321  (void)RemoteGIDs;
322  (void)Deterministic; // Prevent compiler warnings for unused argument.
323  nexports_ = NumExportIDs;
324 
325  int my_proc;
326  MPI_Comm_rank( comm_, &my_proc );
327  int nprocs;
328  MPI_Comm_size( comm_, &nprocs );
329 
330  // Do the forward map component
331  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
332 
333  // Do the reverse map component
334  CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
335 
336  return 0;
337 }
338 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
340  const int * ExportPIDs,
341  const int & NumRemoteIDs,
342  const long long * RemoteGIDs,
343  const int * RemotePIDs,
344  bool Deterministic)
345 {
346  (void)RemoteGIDs;
347  (void)Deterministic; // Prevent compiler warnings for unused argument.
348  nexports_ = NumExportIDs;
349 
350  int my_proc;
351  MPI_Comm_rank( comm_, &my_proc );
352  int nprocs;
353  MPI_Comm_size( comm_, &nprocs );
354 
355  // Do the forward map component
356  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
357 
358  // Do the reverse map component
359  CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
360 
361  return 0;
362 
363 
364 }
365 #endif
366 
367 
368 
369 //==============================================================================
371  int nprocs,
372  const int & NumExportIDs,
373  const int * ExportPIDs)
374 {
375  nexports_ = NumExportIDs;
376 
377  int i;
378 
379  // Check to see if items are grouped by processor w/o gaps
380  // If so, indices_to -> 0
381 
382  // Setup data structures for quick traversal of arrays
383  int * starts = new int[ nprocs + 1 ];
384  for( i = 0; i < nprocs; i++ )
385  starts[i] = 0;
386 
387  int nactive = 0;
388  bool no_send_buff = true;
389  int numDeadIndices = 0; // In some cases the GIDs will not be owned by any processors and the PID will be -1
390 
391  for( i = 0; i < NumExportIDs; i++ )
392  {
393  if( no_send_buff && i && (ExportPIDs[i] < ExportPIDs[i-1]) )
394  no_send_buff = false;
395  if( ExportPIDs[i] >= 0 )
396  {
397  ++starts[ ExportPIDs[i] ];
398  ++nactive;
399  }
400  else numDeadIndices++; // Increase the number of dead indices. Used below to leave these out of the analysis
401  }
402 
403  self_msg_ = ( starts[my_proc] != 0 ) ? 1 : 0;
404 
405  nsends_ = 0;
406 
407  if( no_send_buff ) //grouped by processor, no send buffer or indices_to_ needed
408  {
409  for( i = 0; i < nprocs; ++i )
410  if( starts[i] ) ++nsends_;
411 
412  if( nsends_ )
413  {
414  procs_to_ = new int[nsends_];
415  starts_to_ = new int[nsends_];
416  lengths_to_ = new int[nsends_];
417  }
418 
419  int index = numDeadIndices; // Leave off the dead indices (PID = -1)
420  int proc;
421  for( i = 0; i < nsends_; ++i )
422  {
423  starts_to_[i] = index;
424  proc = ExportPIDs[index];
425  procs_to_[i] = proc;
426  index += starts[proc];
427  }
428 
429  if( nsends_ )
431 
432  max_send_length_ = 0;
433 
434  for( i = 0; i < nsends_; ++i )
435  {
436  proc = procs_to_[i];
437  lengths_to_[i] = starts[proc];
438  if( (proc != my_proc) && (lengths_to_[i] > max_send_length_) )
440  }
441  }
442  else //not grouped by processor, need send buffer and indices_to_
443  {
444  if( starts[0] != 0 ) nsends_ = 1;
445 
446  for( i = 1; i < nprocs; i++ )
447  {
448  if( starts[i] != 0 ) ++nsends_;
449  starts[i] += starts[i-1];
450  }
451 
452  for( i = nprocs-1; i != 0; i-- )
453  starts[i] = starts[i-1];
454 
455  starts[0] = 0;
456 
457  if (nactive>0) {
458  indices_to_ = new int[ nactive ];
459  size_indices_to_ = nactive;
460  }
461 
462  for( i = 0; i < NumExportIDs; i++ )
463  if( ExportPIDs[i] >= 0 )
464  {
465  indices_to_[ starts[ ExportPIDs[i] ] ] = i;
466  ++starts[ ExportPIDs[i] ];
467  }
468 
469  //Reconstuct starts array to index into indices_to.
470 
471  for( i = nprocs-1; i != 0; i-- )
472  starts[i] = starts[i-1];
473  starts[0] = 0;
474  starts[nprocs] = nactive;
475 
476  if (nsends_>0) {
477  lengths_to_ = new int[ nsends_ ];
478  procs_to_ = new int[ nsends_ ];
479  starts_to_ = new int[ nsends_ ];
480  }
481 
482  int j = 0;
483  max_send_length_ = 0;
484 
485  for( i = 0; i < nprocs; i++ )
486  if( starts[i+1] != starts[i] )
487  {
488  lengths_to_[j] = starts[i+1] - starts[i];
489  starts_to_[j] = starts[i];
490  if( ( i != my_proc ) && ( lengths_to_[j] > max_send_length_ ) )
492  procs_to_[j] = i;
493  j++;
494  }
495  }
496 
497  delete [] starts;
498 
499  nsends_ -= self_msg_;
500 
501  return 0;
502 }
503 
504 //==============================================================================
505 int Epetra_MpiDistributor::CreateRecvStructures_(const int & NumRemoteIDs,
506  const int * RemotePIDs)
507 {
508  int i, j;
509 
510  // Since the RemotePIDs should be sorted, counting the total number of recvs should be easy...
511  // use nsends as an initial guess for space.
512  std::vector<int> recv_list;
513  recv_list.reserve(nsends_);
514 
515  int last_pid=-2;
516  for(i=0; i<NumRemoteIDs; i++) {
517  if(RemotePIDs[i]>last_pid) {
518  recv_list.push_back(RemotePIDs[i]);
519  last_pid = RemotePIDs[i];
520  }
521  else if (RemotePIDs[i]<last_pid)
522  throw std::runtime_error("Epetra_MpiDistributor::CreateRecvStructures_ expected RemotePIDs to be in sorted order");
523  }
524  nrecvs_=recv_list.size();
525 
526  if (nrecvs_>0) {
527  starts_from_ = new int[nrecvs_];
528  procs_from_ = new int[nrecvs_];
529  lengths_from_ = new int[nrecvs_];
530  request_ = new MPI_Request[ nrecvs_ ];
531  status_ = new MPI_Status[ nrecvs_ ];
532  }
533 
534  for(i=0,j=0; i<nrecvs_; ++i) {
535  int jlast=j;
536  procs_from_[i] = recv_list[i];
537  starts_from_[i] = j;
538  for( ; j<NumRemoteIDs && RemotePIDs[jlast]==RemotePIDs[j] ; j++){;}
539  lengths_from_[i]=j-jlast;
540  }
541  total_recv_length_=NumRemoteIDs;
542 
543  nrecvs_ -= self_msg_;
544 
545  return 0;
546 }
547 
548 
549 //==============================================================================
550 //---------------------------------------------------------------------------
551 //ComputeRecvs Method
552 //---------------------------------------------------------------------------
554  int nprocs )
555 {
556  int * msg_count = new int[ nprocs ];
557  int * counts = new int[ nprocs ];
558 
559  int i;
560  MPI_Status status;
561 
562  for( i = 0; i < nprocs; i++ )
563  {
564  msg_count[i] = 0;
565  counts[i] = 1;
566  }
567 
568  for( i = 0; i < nsends_+self_msg_; i++ )
569  msg_count[ procs_to_[i] ] = 1;
570 
571 #if defined(REDUCE_SCATTER_BUG)
572 // the bug is found in mpich on linux platforms
573  MPI_Reduce(msg_count, counts, nprocs, MPI_INT, MPI_SUM, 0, comm_);
574  MPI_Scatter(counts, 1, MPI_INT, &nrecvs_, 1, MPI_INT, 0, comm_);
575 #else
576  MPI_Reduce_scatter( msg_count, &nrecvs_, counts, MPI_INT, MPI_SUM, comm_ );
577 #endif
578 
579  delete [] msg_count;
580  delete [] counts;
581 
582  if (nrecvs_>0) {
583  lengths_from_ = new int[nrecvs_];
584  procs_from_ = new int[nrecvs_];
585  for(i=0; i<nrecvs_; ++i) {
586  lengths_from_[i] = 0;
587  procs_from_[i] = 0;
588  }
589  }
590 
591 #ifndef NEW_COMM_PATTERN
592  for( i = 0; i < (nsends_+self_msg_); i++ )
593  if( procs_to_[i] != my_proc ) {
594  MPI_Send( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
595  }
596  else
597  {
598  //set self_msg_ to end block of recv arrays
600  procs_from_[nrecvs_-1] = my_proc;
601  }
602 
603  for( i = 0; i < (nrecvs_-self_msg_); i++ )
604  {
605  MPI_Recv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &status );
606  procs_from_[i] = status.MPI_SOURCE;
607  }
608 
609  MPI_Barrier( comm_ );
610 #else
611  if (nrecvs_>0) {
612  if( !request_ ) {
613  request_ = new MPI_Request[nrecvs_-self_msg_];
614  status_ = new MPI_Status[nrecvs_-self_msg_];
615  }
616  }
617 
618  for( i = 0; i < (nrecvs_-self_msg_); i++ )
619  MPI_Irecv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &(request_[i]) );
620 
621  MPI_Barrier( comm_ );
622 
623  for( i = 0; i < (nsends_+self_msg_); i++ )
624  if( procs_to_[i] != my_proc ) {
625  MPI_Rsend( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
626  }
627  else
628  {
629  //set self_msg_ to end block of recv arrays
631  procs_from_[nrecvs_-1] = my_proc;
632  }
633 
634  if( (nrecvs_-self_msg_) > 0 ) MPI_Waitall( (nrecvs_-self_msg_), request_, status_ );
635 
636  for( i = 0; i < (nrecvs_-self_msg_); i++ )
637  procs_from_[i] = status_[i].MPI_SOURCE;
638 #endif
639 
641 
642  // Compute indices_from_
643  // Seems to break some rvs communication
644 /* Not necessary since rvs communication is always blocked
645  size_indices_from_ = 0;
646  if( nrecvs_ > 0 )
647  {
648  for( i = 0; i < nrecvs_; i++ ) size_indices_from_ += lengths_from_[i];
649  indices_from_ = new int[ size_indices_from_ ];
650 
651  for (i=0; i<size_indices_from_; i++) indices_from_[i] = i;
652  }
653 */
654 
655  if (nrecvs_>0) starts_from_ = new int[nrecvs_];
656  int j = 0;
657  for( i=0; i<nrecvs_; ++i )
658  {
659  starts_from_[i] = j;
660  j += lengths_from_[i];
661  }
662 
663  total_recv_length_ = 0;
664  for( i = 0; i < nrecvs_; i++ )
666 
667  nrecvs_ -= self_msg_;
668 
669  MPI_Barrier( comm_ );
670 
671  return false;
672 }
673 
674 //==============================================================================
675 //---------------------------------------------------------------------------
676 //ComputeSends Method
677 //---------------------------------------------------------------------------
678 template<typename id_type>
680  const id_type *& import_ids,
681  const int *& import_procs,
682  int & num_exports,
683  id_type *& export_ids,
684  int *& export_procs,
685  int my_proc ) {
686 
687  Epetra_MpiDistributor tmp_plan(*epComm_);
688  int i;
689 
690  int * proc_list = 0;
691  int * import_objs = 0;
692  char * c_export_objs = 0;
693  const int pack_size = (1 + sizeof(id_type)/sizeof(int));
694 
695  if( num_imports > 0 )
696  {
697  proc_list = new int[ num_imports ];
698  import_objs = new int[ num_imports * pack_size];
699 
700  for( i = 0; i < num_imports; i++ )
701  {
702  proc_list[i] = import_procs[i];
703 
704  *(id_type*)(import_objs + pack_size*i) = import_ids[i];
705  *(import_objs + pack_size*i + (pack_size-1)) = my_proc;
706  }
707  }
708 
709  EPETRA_CHK_ERR(tmp_plan.CreateFromSends( num_imports, proc_list,
710  true, num_exports) );
711  if( num_exports > 0 )
712  {
713  //export_objs = new int[ 2 * num_exports ];
714  export_ids = new id_type[ num_exports ];
715  export_procs = new int[ num_exports ];
716  }
717  else
718  {
719  export_ids = 0;
720  export_procs = 0;
721  }
722 
723  int len_c_export_objs = 0;
724  EPETRA_CHK_ERR( tmp_plan.Do(reinterpret_cast<char *> (import_objs),
725  pack_size * (int)sizeof( int ),
726  len_c_export_objs,
727  c_export_objs) );
728  int * export_objs = reinterpret_cast<int *>(c_export_objs);
729 
730  for( i = 0; i < num_exports; i++ ) {
731  export_ids[i] = *(id_type*)(export_objs + pack_size*i);
732  export_procs[i] = *(export_objs + pack_size*i + (pack_size-1));
733  }
734 
735  if( proc_list != 0 ) delete [] proc_list;
736  if( import_objs != 0 ) delete [] import_objs;
737  if( len_c_export_objs != 0 ) delete [] c_export_objs;
738 
739  return(0);
740 
741 }
742 
743 //==============================================================================
744 int Epetra_MpiDistributor::Do( char * export_objs,
745  int obj_size,
746  int & len_import_objs,
747  char *& import_objs )
748 {
749  EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, len_import_objs, import_objs) );
750  EPETRA_CHK_ERR( DoWaits() );
751 
752 
754  for(int i=0; i<NumSends(); i++)
755  lastRoundBytesSend_ += lengths_to_[i]*obj_size;
756  lastRoundBytesRecv_ =len_import_objs;
757 
758  return(0);
759 }
760 
761 //==============================================================================
762 int Epetra_MpiDistributor::DoReverse( char * export_objs,
763  int obj_size,
764  int & len_import_objs,
765  char *& import_objs )
766 {
767  EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size,
768  len_import_objs, import_objs) );
770 
771  // This is slightly weird, since DoReversePosts() actually calls DoPosts() on
772  // a reversed distributor
774  for(int i=0; i<NumReceives(); i++)
775  lastRoundBytesRecv_ += lengths_from_[i]*obj_size;
776  lastRoundBytesSend_ =len_import_objs;
777 
778  return(0);
779 }
780 
781 //==============================================================================
782 int Epetra_MpiDistributor::DoPosts( char * export_objs,
783  int obj_size,
784  int & len_import_objs,
785  char *& import_objs )
786 {
787  int i, j, k;
788 
789  int my_proc = 0;
790  int self_recv_address = 0;
791 
792  MPI_Comm_rank( comm_, &my_proc );
793 
794  if( len_import_objs < (total_recv_length_*obj_size) )
795  {
796  if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
797  len_import_objs = total_recv_length_*obj_size;
798  if (len_import_objs>0) import_objs = new char[len_import_objs];
799  for( i=0; i<len_import_objs; ++i ) import_objs[i]=0;
800  }
801 
802  k = 0;
803 
804  j = 0;
805  for( i = 0; i < (nrecvs_+self_msg_); i++ )
806  {
807  if( procs_from_[i] != my_proc )
808  {
809  MPI_Irecv( &(import_objs[j]),
810  lengths_from_[i] * obj_size,
811  MPI_CHAR, procs_from_[i],
812  tag_, comm_,
813  &(request_[k]) );
814  k++;
815  }
816  else
817  self_recv_address = j;
818 
819  j += lengths_from_[i] * obj_size;
820  }
821 
822 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
823  // NOTE (mfh 19 Mar 2012):
824  //
825  // The ready-sends below require that each ready-send's matching
826  // receive (see above) has already been posted. We ensure this with
827  // a barrier. (Otherwise, some process that doesn't need to post
828  // receives might post its ready-send before the receiving process
829  // gets to post its receive.) If you want to remove the barrier,
830  // you'll have to replace the ready-sends below with standard sends
831  // or Isends.
832  MPI_Barrier( comm_ );
833 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
834 
835  //setup scan through procs_to list starting w/ higher numbered procs
836  //Should help balance msg traffic
837  int nblocks = nsends_ + self_msg_;
838  int proc_index = 0;
839  while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
840  ++proc_index;
841  if( proc_index == nblocks ) proc_index = 0;
842 
843  int self_num = 0, self_index = 0;
844  int p;
845 
846  if( !indices_to_ ) //data already blocked by processor
847  {
848  for( i = 0; i < nblocks; ++i )
849  {
850  p = i + proc_index;
851  if( p > (nblocks-1) ) p -= nblocks;
852 
853  if( procs_to_[p] != my_proc ) {
854 
855 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
856  MPI_Rsend( &export_objs[starts_to_[p]*obj_size],
857  lengths_to_[p]*obj_size,
858  MPI_CHAR,
859  procs_to_[p],
860  tag_,
861  comm_ );
862 #else
863  MPI_Send( &export_objs[starts_to_[p]*obj_size],
864  lengths_to_[p]*obj_size,
865  MPI_CHAR,
866  procs_to_[p],
867  tag_,
868  comm_ );
869 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
870  }
871  else {
872  self_num = p;
873  }
874  }
875 
876  if( self_msg_ )
877  memcpy( &import_objs[self_recv_address],
878  &export_objs[starts_to_[self_num]*obj_size],
879  lengths_to_[self_num]*obj_size );
880  }
881  else //data not blocked by proc, use send buffer
882  {
883  if( send_array_size_ < (max_send_length_*obj_size) )
884  {
885  if( send_array_!=0 ) {delete [] send_array_; send_array_ = 0;}
887  if (send_array_size_>0) send_array_ = new char[send_array_size_];
888  }
889 
890  j = 0;
891  for( i = 0; i < nblocks; i++ )
892  {
893  p = i + proc_index;
894  if( p > (nblocks-1) ) p -= nblocks;
895  if( procs_to_[p] != my_proc )
896  {
897  int offset = 0;
898  j = starts_to_[p];
899  for( k = 0; k < lengths_to_[p]; k++ )
900  {
901  memcpy( &(send_array_[offset]),
902  &(export_objs[indices_to_[j]*obj_size]),
903  obj_size );
904  ++j;
905  offset += obj_size;
906  }
907 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
908  MPI_Rsend( send_array_,
909  lengths_to_[p] * obj_size,
910  MPI_CHAR,
911  procs_to_[p],
912  tag_, comm_ );
913 #else
914  MPI_Send( send_array_,
915  lengths_to_[p] * obj_size,
916  MPI_CHAR,
917  procs_to_[p],
918  tag_, comm_ );
919 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
920  }
921  else
922  {
923  self_num = p;
924  self_index = starts_to_[p];
925  }
926  }
927 
928  if( self_msg_ )
929  for( k = 0; k < lengths_to_[self_num]; k++ )
930  {
931  memcpy( &(import_objs[self_recv_address]),
932  &(export_objs[indices_to_[self_index]*obj_size]),
933  obj_size );
934  self_index++;
935  self_recv_address += obj_size;
936  }
937  }
938  return(0);
939 }
940 
941 //==============================================================================
943 {
944  if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
945 
946  return(0);
947 }
948 
949 //==============================================================================
951  int obj_size,
952  int & len_import_objs,
953  char *& import_objs )
954 {
955  assert(indices_to_==0); //Can only do reverse comm when original data
956  // is blocked by processor
957 
958  // If we don't have a reverse distributor, make one.
959  if( comm_plan_reverse_ == 0 )
961 
962  int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, len_import_objs, import_objs);
963 
964  return(comm_flag);
965 }
966 
967 //==============================================================================
969 {
970  if( comm_plan_reverse_ == 0 ) return (-1);
971 
972  int comm_flag = comm_plan_reverse_->DoWaits();
973 
974  return(comm_flag);
975 }
976 
977 //==============================================================================
978 //---------------------------------------------------------------------------
979 //Resize Method (Heaphy)
980 //---------------------------------------------------------------------------
982 {
983  int i, j, k; // loop counters
984  int sum;
985 
986  //if (sizes == 0) return 0;
987 
988  int my_proc;
989  MPI_Comm_rank (comm_, &my_proc);
990  int nprocs;
991  MPI_Comm_size( comm_, &nprocs );
992 
993  if( resized_ )
994  {
995  //test and see if we are already setup for these sizes
996  bool match = true;
997  for( i = 0; i < nexports_; ++i )
998  match = match && (sizes_[i]==sizes[i]);
999  int matched = match?1:0;
1000  int match_count = 0;
1001  MPI_Allreduce( &matched, &match_count, 1, MPI_INT, MPI_SUM, comm_ );
1002  if( match_count == nprocs )
1003  return 0;
1004  else //reset existing sizing arrays
1005  max_send_length_ = 0;
1006  }
1007 
1008  if( !sizes_ && nexports_ ) sizes_ = new int[nexports_];
1009  for (i = 0; i < nexports_; i++)
1010  sizes_[i] = sizes[i];
1011 
1012  if( !sizes_to_ && (nsends_+self_msg_) ) sizes_to_ = new int[nsends_+self_msg_];
1013  for (i = 0; i < (nsends_+self_msg_); ++i)
1014  sizes_to_[i] = 0;
1015 
1017 
1018  if( !indices_to_ ) //blocked sends
1019  {
1020 
1021  int * index = 0;
1022  int * sort_val = 0;
1023  if (nsends_+self_msg_>0) {
1024  index = new int[nsends_+self_msg_];
1025  sort_val = new int[nsends_+self_msg_];
1026  }
1027  for( i = 0; i < (nsends_+self_msg_); ++i )
1028  {
1029  j = starts_to_[i];
1030  for( k = 0; k < lengths_to_[i]; ++k )
1031  sizes_to_[i] += sizes[j++];
1032  if( (sizes_to_[i] > max_send_length_) && (procs_to_[i] != my_proc) )
1034  }
1035 
1036  for( i = 0; i < (nsends_+self_msg_); ++i )
1037  {
1038  sort_val[i] = starts_to_[i];
1039  index[i] = i;
1040  }
1041 
1042  if( nsends_+self_msg_ )
1043  Sort_ints_( sort_val, index, (nsends_+self_msg_) );
1044 
1045  sum = 0;
1046  for( i = 0; i < (nsends_+self_msg_); ++i )
1047  {
1048  starts_to_ptr_[ index[i] ] = sum;
1049  sum += sizes_to_[ index[i] ];
1050  }
1051 
1052  if (index!=0) {delete [] index; index = 0;}
1053  if (sort_val!=0) {delete [] sort_val; sort_val = 0;}
1054  }
1055  else //Sends not blocked, so have to do more work
1056  {
1057  if( !indices_to_ptr_ && nexports_ ) indices_to_ptr_ = new int[nexports_];
1058  int * offset = 0;
1059  if( nexports_ ) offset = new int[nexports_];
1060 
1061  //Compute address for every item in send array
1062  sum = 0;
1063  for( i = 0; i < nexports_; ++i )
1064  {
1065  offset[i] = sum;
1066  sum += sizes_[i];
1067  }
1068 
1069  sum = 0;
1070  max_send_length_ = 0;
1071  for( i = 0; i < (nsends_+self_msg_); ++i )
1072  {
1073  starts_to_ptr_[i] = sum;
1074  for( j = starts_to_[i]; j < (starts_to_[i]+lengths_to_[i]); ++j )
1075  {
1076  indices_to_ptr_[j] = offset[ indices_to_[j] ];
1077  sizes_to_[i] += sizes_[ indices_to_[j] ];
1078  }
1079  if( sizes_to_[i] > max_send_length_ && procs_to_[i] != my_proc )
1081  sum += sizes_to_[i];
1082  }
1083 
1084  if (offset!=0) {delete [] offset; offset = 0;}
1085  }
1086 
1087  // Exchange sizes routine inserted here:
1088  int self_index_to = -1;
1089  total_recv_length_ = 0;
1090  if( !sizes_from_ && (nrecvs_+self_msg_) ) sizes_from_ = new int [nrecvs_+self_msg_];
1091 
1092 #ifndef EPETRA_NEW_COMM_PATTERN
1093  for (i = 0; i < (nsends_+self_msg_); i++)
1094  {
1095  if(procs_to_[i] != my_proc)
1096  MPI_Send ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1097  else
1098  self_index_to = i;
1099  }
1100 
1101  MPI_Status status;
1102  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1103  {
1104  sizes_from_[i] = 0;
1105  if (procs_from_[i] != my_proc)
1106  MPI_Recv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &status);
1107  else
1108  sizes_from_[i] = sizes_to_[self_index_to];
1110  }
1111 #else
1112  if (nrecvs_>0 && !request_) {
1113  request_ = new MPI_Request[ nrecvs_-self_msg_ ];
1114  status_ = new MPI_Status[ nrecvs_-self_msg_ ];
1115  }
1116 
1117  for (i = 0; i < (nsends_+self_msg_); i++)
1118  {
1119  if(procs_to_[i] == my_proc)
1120  self_index_to = i;
1121  }
1122 
1123  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1124  {
1125  sizes_from_[i] = 0;
1126  if (procs_from_[i] != my_proc)
1127  MPI_Irecv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &(request_[i]));
1128  else
1129  {
1130  sizes_from_[i] = sizes_to_[self_index_to];
1132  }
1133  }
1134 
1135  MPI_Barrier( comm_ );
1136 
1137  for (i = 0; i < (nsends_+self_msg_); i++)
1138  {
1139  if(procs_to_[i] != my_proc)
1140  MPI_Rsend ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1141  }
1142 
1143  if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
1144 
1145  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1146  {
1147  if (procs_from_[i] != my_proc)
1149  }
1150 #endif
1151  // end of exchanges sizes insert
1152 
1153  sum = 0;
1155  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1156  {
1157  starts_from_ptr_[i] = sum;
1158  sum += sizes_from_[i];
1159  }
1160 
1161  resized_ = true;
1162 
1163  return 0;
1164 }
1165 
1166 //==============================================================================
1167 int Epetra_MpiDistributor::Do( char * export_objs,
1168  int obj_size,
1169  int *& sizes,
1170  int & len_import_objs,
1171  char *& import_objs )
1172 {
1173  EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, sizes,
1174  len_import_objs, import_objs) );
1175  EPETRA_CHK_ERR( DoWaits() );
1176 
1177  return(0);
1178 }
1179 
1180 //==============================================================================
1181 int Epetra_MpiDistributor::DoReverse( char * export_objs,
1182  int obj_size,
1183  int *& sizes,
1184  int & len_import_objs,
1185  char *& import_objs )
1186 {
1187  EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size, sizes,
1188  len_import_objs, import_objs) );
1190 
1191  return(0);
1192 }
1193 
1194 //==============================================================================
1195 int Epetra_MpiDistributor::DoPosts( char * export_objs,
1196  int obj_size,
1197  int *& sizes,
1198  int & len_import_objs,
1199  char *& import_objs )
1200 {
1201  int ierr = Resize_(sizes);
1202  if (ierr != 0) {
1203  return(ierr);
1204  }
1205 
1206  MPI_Barrier( comm_ );
1207 
1208  int i, j, k;
1209 
1210  int my_proc = 0;
1211  int self_recv_address = 0;
1212 
1213  MPI_Comm_rank( comm_, &my_proc );
1214 
1215  if( len_import_objs < (total_recv_length_*obj_size) )
1216  {
1217  if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
1218  len_import_objs = total_recv_length_*obj_size;
1219  if (len_import_objs>0) import_objs = new char[len_import_objs];
1220  }
1221 
1222  k = 0;
1223 
1224  for( i = 0; i < (nrecvs_+self_msg_); ++i )
1225  {
1226  if( procs_from_[i] != my_proc )
1227  {
1228  MPI_Irecv( &(import_objs[starts_from_ptr_[i] * obj_size]),
1229  sizes_from_[i] * obj_size,
1230  MPI_CHAR, procs_from_[i], tag_, comm_, &(request_[k]) );
1231  k++;
1232  }
1233  else
1234  self_recv_address = starts_from_ptr_[i] * obj_size;
1235  }
1236 
1237  MPI_Barrier( comm_ );
1238 
1239  //setup scan through procs_to list starting w/ higher numbered procs
1240  //Should help balance msg traffic
1241  int nblocks = nsends_ + self_msg_;
1242  int proc_index = 0;
1243  while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
1244  ++proc_index;
1245  if( proc_index == nblocks ) proc_index = 0;
1246 
1247  int self_num = 0;
1248  int p;
1249 
1250  if( !indices_to_ ) //data already blocked by processor
1251  {
1252  for( i = 0; i < nblocks; ++i )
1253  {
1254  p = i + proc_index;
1255  if( p > (nblocks-1) ) p -= nblocks;
1256 
1257  if( procs_to_[p] != my_proc )
1258  MPI_Rsend( &export_objs[starts_to_ptr_[p]*obj_size],
1259  sizes_to_[p]*obj_size,
1260  MPI_CHAR,
1261  procs_to_[p],
1262  tag_,
1263  comm_ );
1264  else
1265  self_num = p;
1266  }
1267 
1268  if( self_msg_ )
1269  memcpy( &import_objs[self_recv_address],
1270  &export_objs[starts_to_ptr_[self_num]*obj_size],
1271  sizes_to_[self_num]*obj_size );
1272  }
1273  else //data not blocked by proc, need to copy to buffer
1274  {
1275  if( send_array_size_ && send_array_size_ < (max_send_length_*obj_size) )
1276  {
1277  if (send_array_!=0) {delete [] send_array_; send_array_ = 0;}
1278  send_array_ = 0;
1279  send_array_size_ = 0;
1280  }
1281  if( !send_array_size_ )
1282  {
1284  if (send_array_size_>0) send_array_ = new char[send_array_size_];
1285  }
1286 
1287  for( i=0; i<nblocks; ++i )
1288  {
1289  p = i + proc_index;
1290  if( p > (nblocks-1) ) p -= nblocks;
1291 
1292  if( procs_to_[p] != my_proc )
1293  {
1294  int offset = 0;
1295  j = starts_to_[p];
1296  for( k=0; k<lengths_to_[p]; ++k )
1297  {
1298  memcpy( &send_array_[offset],
1299  &export_objs[indices_to_ptr_[j]*obj_size],
1300  sizes_[indices_to_[j]]*obj_size );
1301  offset += sizes_[indices_to_[j]]*obj_size;
1302  ++j;
1303  }
1304  MPI_Rsend( send_array_, sizes_to_[p]*obj_size,
1305  MPI_CHAR, procs_to_[p], tag_, comm_ );
1306  }
1307  else
1308  self_num = p;
1309  }
1310 
1311  if( self_msg_ )
1312  {
1313  int jj;
1314  j = starts_to_[self_num];
1315  for( k=0; k<lengths_to_[self_num]; ++k )
1316  {
1317  jj = indices_to_ptr_[j];
1318  memcpy( &import_objs[self_recv_address],
1319  &export_objs[jj*obj_size],
1320  sizes_[indices_to_[j]*obj_size] );
1321  self_recv_address += (obj_size*sizes_[indices_to_[j]]);
1322  ++jj;
1323  }
1324  }
1325  }
1326 
1327  return(0);
1328 }
1329 
1330 //==============================================================================
1332  int obj_size,
1333  int *& sizes,
1334  int & len_import_objs,
1335  char *& import_objs )
1336 {
1337  assert(indices_to_==0); //Can only do reverse comm when original data
1338  // is blocked by processor
1339 
1340  // If we don't have a reverse distributor, make one.
1341  if( comm_plan_reverse_ == 0 )
1343 
1344  int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, sizes, len_import_objs, import_objs);
1345 
1346  return(comm_flag);
1347 }
1348 
1349 //==============================================================================
1350 void Epetra_MpiDistributor::Print(std::ostream & os) const
1351 {
1352  using std::endl;
1353 
1354  int myRank = 0, numProcs = 1;
1355  MPI_Comm_rank (comm_, &myRank);
1356  MPI_Comm_size (comm_, &numProcs);
1357 
1358  if (myRank == 0) {
1359  os << "Epetra_MpiDistributor (implements Epetra_Distributor)" << std::endl;
1360  }
1361  // Let each MPI process print its data. We assume that all
1362  // processes can print to the given output stream, and execute
1363  // barriers to make it more likely that the output will be in the
1364  // right order.
1365  for (int p = 0; p < numProcs; ++p) {
1366  if (myRank == p) {
1367  os << "[Node " << p << " of " << numProcs << "]" << std::endl;
1368  os << " selfMessage: " << self_msg_ << std::endl;
1369  os << " numSends: " << nsends_ << std::endl;
1370 
1371  os << " imagesTo: [";
1372  for (int i = 0; i < nsends_; ++i) {
1373  os << procs_to_[i];
1374  if (i < nsends_ - 1) {
1375  os << " ";
1376  }
1377  }
1378  os << "]" << std::endl;
1379 
1380  os << " lengthsTo: [";
1381  for (int i = 0; i < nsends_; ++i) {
1382  os << lengths_to_[i];
1383  if (i < nsends_ - 1) {
1384  os << " ";
1385  }
1386  }
1387  os << "]" << std::endl;
1388 
1389  os << " maxSendLength: " << max_send_length_ << std::endl;
1390 
1391  os << " startsTo: ";
1392  if (starts_to_ == NULL) {
1393  os << "(NULL)" << std::endl;
1394  } else {
1395  os << "[";
1396  for (int i = 0; i < nsends_; ++i) {
1397  os << starts_to_[i];
1398  if (i < nsends_ - 1) {
1399  os << " ";
1400  }
1401  }
1402  os << "]" << std::endl;
1403  }
1404 
1405  os << " indicesTo: ";
1406  if (indices_to_ == NULL) {
1407  os << "(NULL)" << std::endl;
1408  } else {
1409  os << "[";
1410  int k = 0;
1411  for (int i = 0; i < nsends_; ++i) {
1412  for (int j = 0; j < lengths_to_[i]; ++j) {
1413  os << " " << indices_to_[j+k];
1414  }
1415  k += lengths_to_[i];
1416  }
1417  os << "]" << std::endl;
1418  }
1419 
1420  os << " numReceives: " << nrecvs_ << std::endl;
1421  os << " totalReceiveLength: " << total_recv_length_ << std::endl;
1422 
1423  os << " lengthsFrom: [";
1424  for (int i = 0; i < nrecvs_; ++i) {
1425  os << lengths_from_[i];
1426  if (i < nrecvs_ - 1) {
1427  os << " ";
1428  }
1429  }
1430  os << "]" << std::endl;
1431 
1432  os << " startsFrom: [";
1433  for (int i = 0; i < nrecvs_; ++i) {
1434  os << starts_from_[i];
1435  if (i < nrecvs_ - 1) {
1436  os << " ";
1437  }
1438  }
1439  os << "]" << std::endl;
1440 
1441  os << " imagesFrom: [";
1442  for (int i = 0; i < nrecvs_; ++i) {
1443  os << procs_from_[i];
1444  if (i < nrecvs_ - 1) {
1445  os << " ";
1446  }
1447  }
1448  os << "]" << std::endl;
1449 
1450  // mfh 16 Dec 2011: I found this commented out here; not sure if
1451  // we want to print this, so I'm leaving it commented out.
1452  /*
1453  os << "indices_from: ";
1454  k = 0;
1455  for( i = 0; i < nrecvs_; i++ )
1456  {
1457  for( j = 0; j < lengths_from_[i]; j++ )
1458  os << " " << indices_from_[j+k];
1459  k += lengths_from_[i];
1460  }
1461  */
1462 
1463  // Last output is a flush; it leaves a space and also
1464  // helps synchronize output.
1465  os << std::flush;
1466  } // if it's my process' turn to print
1467 
1468  // Execute barriers to give output time to synchronize.
1469  // One barrier generally isn't enough.
1470  MPI_Barrier (comm_);
1471  MPI_Barrier (comm_);
1472  MPI_Barrier (comm_);
1473  }
1474 }
1475 
1476 //---------------------------------------------------------------------------
1478  int *vals_sort, // values to be sorted
1479  int *vals_other, // other array to be reordered with sort
1480  int nvals) // length of these two arrays
1481 {
1482 // It is primarily used to sort messages to improve communication flow.
1483 // This routine will also insure that the ordering produced by the invert_map
1484 // routines is deterministic. This should make bugs more reproducible. This
1485 // is accomplished by sorting the message lists by processor ID.
1486 // This is a distribution count sort algorithm (see Knuth)
1487 // This version assumes non negative integers.
1488 
1489  if (nvals <= 1) return 0;
1490 
1491  int i; // loop counter
1492 
1493  // find largest int, n, to size sorting array, then allocate and clear it
1494  int n = 0;
1495  for (i = 0; i < nvals; i++)
1496  if (n < vals_sort[i]) n = vals_sort[i];
1497  int *pos = new int [n+2];
1498  for (i = 0; i < n+2; i++) pos[i] = 0;
1499 
1500  // copy input arrays into temporary copies to allow sorting original arrays
1501  int *copy_sort = new int [nvals];
1502  int *copy_other = new int [nvals];
1503  for (i = 0; i < nvals; i++)
1504  {
1505  copy_sort[i] = vals_sort[i];
1506  copy_other[i] = vals_other[i];
1507  }
1508 
1509  // count the occurances of integers ("distribution count")
1510  int *p = pos+1;
1511  for (i = 0; i < nvals; i++) p[copy_sort[i]]++;
1512 
1513  // create the partial sum of distribution counts
1514  for (i = 1; i < n; i++) p[i] += p[i-1];
1515 
1516  // the shifted partitial sum is the index to store the data in sort order
1517  p = pos;
1518  for (i = 0; i < nvals; i++)
1519  {
1520  vals_sort [p[copy_sort [i]]] = copy_sort[i];
1521  vals_other [p[copy_sort [i]]++] = copy_other[i];
1522  }
1523 
1524  delete [] copy_sort;
1525  delete [] copy_other;
1526  delete [] pos;
1527 
1528  return 0;
1529 }
1530 
1531 //-------------------------------------------------------------------------
1533 {
1534  (void)src;
1535  //not currently supported
1536  bool throw_error = true;
1537  if (throw_error) {
1538  throw ReportError("Epetra_MpiDistributor::operator= not supported.",-1);
1539  }
1540  return( *this );
1541 }
1542 
1543 
1544 //-------------------------------------------------------------------------
1546  int i;
1547  int my_proc = 0;
1548 
1549  MPI_Comm_rank( comm_, &my_proc );
1550 
1551  if( comm_plan_reverse_ == 0 ) {
1552  int total_send_length = 0;
1553  for( i = 0; i < nsends_+self_msg_; i++ )
1554  total_send_length += lengths_to_[i];
1555 
1556  int max_recv_length = 0;
1557  for( i = 0; i < nrecvs_; i++ )
1558  if( procs_from_[i] != my_proc )
1559  if( lengths_from_[i] > max_recv_length )
1560  max_recv_length = lengths_from_[i];
1561 
1563 
1568 
1573 
1577 
1578  comm_plan_reverse_->max_send_length_ = max_recv_length;
1579  comm_plan_reverse_->total_recv_length_ = total_send_length;
1580 
1581  comm_plan_reverse_->request_ = new MPI_Request[ comm_plan_reverse_->nrecvs_ ];
1583 
1585  }
1586 
1587 }
1588 
1589 //-------------------------------------------------------------------------
1591  if(comm_plan_reverse_==0)
1593 
1594  return(dynamic_cast<Epetra_Distributor *>(new Epetra_MpiDistributor(*comm_plan_reverse_)));
1595 }
int CreateSendStructures_(int my_proc, int nprocs, const int &NumExportIDs, const int *ExportPIDs)
int ComputeSends_(int num_imports, const id_type *&import_ids, const int *&import_procs, int &num_exports, id_type *&export_ids, int *&export_procs, int my_proc)
int ComputeRecvs_(int my_proc, int nprocs)
Epetra_MpiDistributor & operator=(const Epetra_MpiDistributor &src)
int DoPosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Post buffer of export objects (can do other local work before executing Waits)
MPI_Comm Comm() const
Extract MPI Communicator from a Epetra_MpiComm object.
Epetra_Distributor: The Epetra Gather/Scatter Setup Base Class.
int CreateFromRecvs(const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic, int &NumExportIDs, int *&ExportGIDs, int *&ExportPIDs)
Create a communication plan from receive list.
int DoReverseWaits()
Wait on a reverse set of posts.
#define EPETRA_CHK_ERR(a)
int CreateFromSendsAndRecvs(const int &NumExportIDs, const int *ExportPIDs, const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic)
Create a communication plan from send list and a recv list.
MPI implementation of Epetra_Distributor.
virtual int ReportError(const std::string Message, int ErrorCode) const
Error reporting method.
Epetra_MpiComm: The Epetra MPI Communication Class.
int CreateRecvStructures_(const int &NumRemoteIDs, const int *RemotePIDs)
Epetra_Object: The base Epetra class.
Definition: Epetra_Object.h:57
MPI_Comm GetMpiComm() const
Get the MPI Communicator (identical to Comm() method; used when we know we are MPI.
virtual ~Epetra_MpiDistributor()
Destructor (declared virtual for memory safety).
int CreateFromSends(const int &NumExportIDs, const int *ExportPIDs, bool Deterministic, int &NumRemoteIDs)
Create a communication plan from send list.
int DoWaits()
Wait on a set of posts.
int DoReversePosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Do reverse post of buffer of export objects (can do other local work before executing Waits) ...
Epetra_Distributor * ReverseClone()
Create and extract the reverse version of the distributor.
int NumSends() const
The number of procs to which we will send data.
Epetra_MpiDistributor * comm_plan_reverse_
int Sort_ints_(int *vals, int *other, int nvals)
int GetMpiTag() const
Acquire an MPI tag from the Epetra range of 24050-24099, increment tag.
int Do(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute plan on buffer of export objects in a single step.
Epetra_MpiDistributor(const Epetra_MpiComm &Comm)
Default constructor.
int DoReverse(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute reverse of plan on buffer of export objects in a single step.
const Epetra_MpiComm * epComm_
int NumReceives() const
The number of procs from which we will receive data.
void Print(std::ostream &os) const