Sierra Toolkit  Version of the Day
ParallelReduce.cpp
1 /*------------------------------------------------------------------------*/
2 /* Copyright 2010 Sandia Corporation. */
3 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */
4 /* license for use of this work by or on behalf of the U.S. Government. */
5 /* Export of this program may require a license from the */
6 /* United States Government. */
7 /*------------------------------------------------------------------------*/
8 
9 #include <stk_util/parallel/ParallelReduce.hpp>
10 
11 #include <stdlib.h>
12 #include <stdexcept>
13 #include <sstream>
14 #include <vector>
15 
16 namespace stk_classic {
17 
18 #if defined( STK_HAS_MPI )
19 
22 
24  std::ostream & arg_root_os ,
25  const std::string & arg_msg )
26 {
27  const int i_zero = 0 ;
28  const int p_root = 0 ;
29  const unsigned p_size = parallel_machine_size( arg_comm );
30  const unsigned p_rank = parallel_machine_rank( arg_comm );
31 
32  int result ;
33 
34  // Gather the send counts on root processor
35 
36  int send_count = arg_msg.size();
37 
38  std::vector<int> recv_count( p_size , i_zero );
39 
40  int * const recv_count_ptr = & recv_count[0] ;
41 
42  result = MPI_Gather( & send_count , 1 , MPI_INT ,
43  recv_count_ptr , 1 , MPI_INT ,
44  p_root , arg_comm );
45 
46  if ( MPI_SUCCESS != result ) {
47  std::ostringstream msg ;
48  msg << "stk_classic::all_write FAILED: MPI_Gather = " << result ;
49  throw std::runtime_error( msg.str() );
50  }
51 
52  // Receive counts are only non-zero on the root processor:
53 
54  std::vector<int> recv_displ( p_size + 1 , i_zero );
55 
56  for ( unsigned i = 0 ; i < p_size ; ++i ) {
57  recv_displ[i+1] = recv_displ[i] + recv_count[i] ;
58  }
59 
60  const unsigned recv_size = (unsigned) recv_displ[ p_size ] ;
61 
62  std::vector<char> buffer( recv_size );
63 
64  {
65  const char * const send_ptr = arg_msg.c_str();
66  char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
67  int * const recv_displ_ptr = & recv_displ[0] ;
68 
69  result = MPI_Gatherv( (void*) send_ptr, send_count, MPI_CHAR ,
70  recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
71  p_root, arg_comm );
72  }
73 
74  if ( MPI_SUCCESS != result ) {
75  std::ostringstream msg ;
76  msg << "stk_classic::all_write FAILED: MPI_Gatherv = " << result ;
77  throw std::runtime_error( msg.str() );
78  }
79 
80  if ( p_root == (int) p_rank ) {
81 // arg_root_os << std::endl ;
82  for ( unsigned i = 0 ; i < p_size ; ++i ) {
83  if ( recv_count[i] ) {
84  char * const ptr = & buffer[ recv_displ[i] ];
85  arg_root_os.write( ptr , recv_count[i] );
86  arg_root_os << std::endl ;
87  }
88  }
89  arg_root_os.flush();
90  }
91 }
92 
93 //----------------------------------------------------------------------
94 //----------------------------------------------------------------------
95 
96 void all_reduce( ParallelMachine arg_comm ,
97  ParallelReduceOp arg_op ,
98  void * arg_in ,
99  void * arg_out ,
100  unsigned arg_len )
101 {
102  MPI_Op mpi_op = MPI_OP_NULL ;
103 
104  MPI_Op_create( arg_op , 0 , & mpi_op );
105 
106  // The SUN was buggy when combining an
107  // MPI_Allreduce with a user defined operator,
108  // use reduce/broadcast instead.
109 /*
110  const int result =
111  MPI_Allreduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,arg_comm);
112 */
113 
114  const int result_reduce =
115  MPI_Reduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,0,arg_comm);
116 
117  const int result_bcast =
118  MPI_Bcast(arg_out,arg_len,MPI_BYTE,0,arg_comm);
119 
120  MPI_Op_free( & mpi_op );
121 
122  if ( MPI_SUCCESS != result_reduce || MPI_SUCCESS != result_bcast ) {
123  std::ostringstream msg ;
124  msg << "stk_classic::all_reduce FAILED: MPI_Reduce = " << result_reduce
125  << " MPI_Bcast = " << result_bcast ;
126  throw std::runtime_error( msg.str() );
127  }
128 }
129 
130 //----------------------------------------------------------------------
131 //----------------------------------------------------------------------
132 
134  const double * local , double * global , unsigned count )
135 {
136  double * tmp = const_cast<double*>( local );
137  MPI_Allreduce( tmp , global , count , MPI_DOUBLE , MPI_SUM , comm );
138 }
139 
141  const float * local , float * global , unsigned count )
142 {
143  float * tmp = const_cast<float*>( local );
144  MPI_Allreduce( tmp , global , count , MPI_FLOAT , MPI_SUM , comm );
145 }
146 
148  const int * local , int * global , unsigned count )
149 {
150  int * tmp = const_cast<int*>( local );
151  MPI_Allreduce( tmp , global , count , MPI_INT , MPI_SUM , comm );
152 }
153 
155  const size_t * local , size_t * global , unsigned count )
156 {
157  size_t * tmp = const_cast<size_t*>( local );
158 
159  if ( sizeof(size_t) == sizeof(unsigned) ) {
160  MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_SUM , comm );
161  }
162  else if ( sizeof(size_t) == sizeof(unsigned long) ) {
163  MPI_Allreduce( tmp , global , count , MPI_UNSIGNED_LONG , MPI_SUM , comm );
164  }
165  else {
166  unsigned long * const in = new unsigned long[ count ];
167  unsigned long * const out = new unsigned long[ count ];
168 
169  for ( unsigned i = 0 ; i < count ; ++i ) { in[i] = local[i] ; }
170  MPI_Allreduce( in , out , count , MPI_UNSIGNED_LONG , MPI_SUM , comm );
171  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = out[i] ; }
172 
173  delete[] in ;
174  delete[] out ;
175  }
176 }
177 
179  const unsigned * local ,
180  unsigned * global , unsigned count )
181 {
182  unsigned * tmp = const_cast<unsigned*>( local );
183  MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_BOR , comm );
184 }
185 
186 //----------------------------------------------------------------------
187 //----------------------------------------------------------------------
188 
189 #else
190 
192  std::ostream & arg_root_os ,
193  const std::string & arg_msg )
194 {
195  arg_root_os << arg_msg ;
196 }
197 
199  const double * local , double * global , unsigned count )
200 {
201  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
202 }
203 
205  const float * local , float * global , unsigned count )
206 {
207  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
208 }
209 
211  const int * local , int * global , unsigned count )
212 {
213  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
214 }
215 
217  const size_t * local , size_t * global , unsigned count )
218 {
219  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
220 }
221 
223  const unsigned * local ,
224  unsigned * global , unsigned count )
225 {
226  for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
227 }
228 
229 //----------------------------------------------------------------------
230 
231 void all_reduce( ParallelMachine ,
232  ParallelReduceOp ,
233  void * arg_in ,
234  void * arg_out ,
235  unsigned arg_len )
236 {
237  unsigned char * i = reinterpret_cast<unsigned char *>( arg_in );
238  unsigned char * o = reinterpret_cast<unsigned char *>( arg_out );
239  for ( unsigned char * const e = i + arg_len ; e != i ; ++i , ++o ) {
240  *o = *i ;
241  }
242 }
243 
244 #endif
245 
246 }
247 
void all_reduce_bor(ParallelMachine comm, const unsigned *local, unsigned *global, unsigned count)
Parallel bitwise-or to all processors.
void all_reduce_sum(ParallelMachine comm, const double *local, double *global, unsigned count)
Parallel summation to all processors.
unsigned parallel_machine_rank(ParallelMachine parallel_machine)
Member function parallel_machine_rank ...
Definition: Parallel.cpp:29
unsigned parallel_machine_size(ParallelMachine parallel_machine)
Member function parallel_machine_size ...
Definition: Parallel.cpp:18
Sierra Toolkit.
void all_write_string(ParallelMachine arg_comm, std::ostream &arg_root_os, const std::string &arg_msg)
Write string from any or all processors to the ostream on the root processor.
MPI_Comm ParallelMachine
Definition: Parallel.hpp:32
eastl::iterator_traits< InputIterator >::difference_type count(InputIterator first, InputIterator last, const T &value)