25 #ifndef _chemistry_qc_lmp2_parallel_h
26 #define _chemistry_qc_lmp2_parallel_h
29 #define OMPI_SKIP_MPICXX
30 #define MPICH_SKIP_MPICXX
36 #include <mpqc_config.h>
37 #include <util/misc/scint.h>
38 #include <util/misc/regtime.h>
40 #include <chemistry/qc/lmp2/collective.h>
58 grp->sum(data_->data(), data_->ndata(), 0, 0);
61 grp->bcast(data_->data(), data_->ndata());
64 allsum(data_->data(), data_->ndata());
81 tim.
enter(
"parallel_union");
82 int nblock_local = n_block();
83 int nblock_max = n_block();
85 int *local_blocks =
new int[1 + nblock_local*N];
86 int *remote_blocks =
new int[1 + nblock_max*N];
88 local_blocks[iarray++] = nblock_local;
89 for (
typename blockmap_t::iterator i = blocks_.begin();
92 for (
int j=0; j<N; j++) {
93 local_blocks[iarray++] = i->first.block(j);
96 int source = (me + nproc - 1)%nproc;
97 for (
int next_node = (me+1)%nproc;
99 next_node = (next_node+1)%nproc) {
101 MPI_Request send_req, recv_req;
102 MPI_Irecv(remote_blocks,1+nblock_max*N,MPI_INT,source,
103 100+source,MPI_COMM_WORLD,&recv_req);
104 MPI_Isend(local_blocks,1+nblock_local*N,MPI_INT,next_node,
105 100+me,MPI_COMM_WORLD,&send_req);
107 MPI_Wait(&recv_req,&status);
109 int nblock = remote_blocks[iarray++];
110 for (
int i=0; i<nblock; i++) {
112 for (
int j=0; j<N; j++) bi.block(j) = remote_blocks[iarray++];
113 add_unallocated_block(bi);
115 MPI_Wait(&send_req,&status);
116 source = (source + nproc - 1)%nproc;
118 throw std::runtime_error(
"Array<N,C>::replicated_from_distributed requires MPI");
121 delete[] local_blocks;
122 delete[] remote_blocks;
123 tim.
exit(
"parallel_union");
139 std::map<std::pair<int,int>,
int> nodemap_;
146 const std::set<std::pair<int,int> > &indexset,
147 bool randomize =
false) {
150 std::vector<int> nodemap(indexset.size());
151 for (
int i=0; i<nodemap.size(); i++) nodemap[i] = i%nproc_;
153 std::random_shuffle(nodemap.begin(), nodemap.end());
156 for (std::set<std::pair<int,int> >::const_iterator i = indexset.begin();
159 nodemap_[*i] = nodemap[node++];
165 const std::map<
int,std::set<int> > &indexmap) {
169 for (
std::map<
int,std::set<int> >::const_iterator i = indexmap.begin();
172 for (std::set<int>::const_iterator j = i->second.begin();
173 j != i->second.end();
175 nodemap_[std::make_pair(i->first,*j)] = node++%nproc_;
179 int pair_to_node(
int i,
int j)
const {
180 std::map<std::pair<int,int>,
int>::const_iterator
181 found = nodemap_.find(std::make_pair(i,j));
182 if (found == nodemap_.end()) {
183 throw std::runtime_error(
"PairMapping: requested non-existent block");
185 return found->second;
187 int me()
const {
return me_; }
188 int nproc()
const {
return nproc_; }
189 void local_pairs(std::set<std::pair<int,int> > &pairs)
const {
191 for (
std::map<std::pair<int,int>,
int>::const_iterator i=nodemap_.begin();
194 if (i->second == me_) pairs.insert(i->first);
198 for (
std::map<std::pair<int,int>,
int>::const_iterator i=nodemap_.begin();
202 <<
" " << i->first.second
203 <<
" -> " << i->second
218 const std::set<std::pair<int,int> > &indexset,
219 bool randomize =
false) {
221 mapping_ =
new PairMapping(grp,indexset,randomize);
225 const std::map<
int,std::set<int> > &indexmap) {
237 return mapping_->pair_to_node(b.block(i0_),b.block(i1_));
239 void local_pairs(std::set<std::pair<int,int> > &pairs)
const {
240 mapping_->local_pairs(pairs);
251 sc::sc_uint64_t size_[N];
255 void init_sizes(
const Array<N> &a) {
257 for (
int i=1; i<N; i++) {
258 size_[i] = size_[i-1] * a.
index(i).nindex();
260 for (
int i=0; i<N; i++) {
261 nindex_i_[i] = a.
index(i).nindex();
266 int i0): nindex_(1) {
273 int i0,
int i1): nindex_(2) {
275 indices_[0]=i0; indices_[1]=i1;
280 int i0,
int i1,
int i2): nindex_(3) {
282 indices_[0]=i0; indices_[1]=i1; indices_[2]=i2;
287 sc::sc_uint64_t loc = 0;
288 for (
int i=0; i<nindex_; i++) {
289 loc += size_[i] * b.block(indices_[i]);
294 int nlocalindex(
int node = -1) {
295 if (node == -1) node = me_;
296 int r = nindex_i_[0] / nproc_;
297 if (me_ < nindex_i_[0] % nproc_) r++;
301 int localindex(
int i,
int node = -1) {
302 if (node == -1) node = me_;
303 return me_ + nproc_*i;
313 tim.
enter(
"repl_from_dist");
316 int nproc = msg->
n();
329 tolerance_ = d.tolerance_;
338 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
339 i != d.blocks_.end();
342 typename blockmap_t::iterator loc = blocks_.find(i->first);
345 if (loc != blocks_.end()) {
346 memcpy(loc->second, i->second, ndat*
sizeof(
double));
352 parallel_accumulate(msg);
354 tim.
exit(
"repl_from_dist");
362 bool clear_source_array,
363 bool ignore_block_distrib_throws)
366 int nproc = msg->
n();
373 if (clear_source_array) d.
clear();
377 throw std::runtime_error(
"Array<N,C>::distributed_from_distributed requires MPI");
391 int *nblock_send =
new int[nproc];
392 int *nblock_recv =
new int[nproc];
393 for (
int i=0; i<nproc; i++) nblock_send[i] = 0;
394 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
395 i != d.blocks_.end();
398 if (ignore_block_distrib_throws) {
413 MPI_Alltoall(nblock_send, 1, MPI_INT,
414 nblock_recv, 1, MPI_INT,
429 int *block_send_offset =
new int[nproc];
430 int *block_recv_offset =
new int[nproc];
431 int *block_send_size =
new int[nproc];
432 int *block_recv_size =
new int[nproc];
433 for (
int i=0; i<nproc; i++) {
434 block_send_size[i] = N*nblock_send[i];
435 block_recv_size[i] = N*nblock_recv[i];
437 block_send_offset[i] = 0;
438 block_recv_offset[i] = 0;
441 block_send_offset[i] = block_send_size[i-1]
442 + block_send_offset[i-1];
443 block_recv_offset[i] = block_recv_size[i-1]
444 + block_recv_offset[i-1];
447 int block_send_size_total = block_send_size[nproc-1]
448 + block_send_offset[nproc-1];
449 int block_recv_size_total = block_recv_size[nproc-1]
450 + block_recv_offset[nproc-1];
455 int *block_send_buffers =
new int[block_send_size_total];
456 std::vector<int> current_send_offset(nproc);
457 std::fill(current_send_offset.begin(), current_send_offset.end(), 0);
458 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
459 i != d.blocks_.end();
462 if (ignore_block_distrib_throws) {
473 for (
int j=0; j<N; j++)
474 block_send_buffers[block_send_offset[node]
475 + current_send_offset[node]++]
480 int *block_recv_buffers =
new int[block_recv_size_total];
481 custom_alltoallv(block_send_buffers, block_send_size,
482 block_send_offset, MPI_INT,
483 block_recv_buffers, block_recv_size,
484 block_recv_offset, MPI_INT,
489 long ndata_recv_total = 0;
490 int *data_recv_offset =
new int[nproc];
491 int *data_recv_size =
new int[nproc];
492 for (
int i=0; i<nproc; i++) {
493 int nblock = nblock_recv[i];
495 for (
int j=0; j<nblock; j++) {
497 for (
int k=0; k<N; k++) {
498 bi.block(k) = block_recv_buffers[irecv++];
500 ndata += block_size(bi);
501 add_unallocated_block(bi);
503 ndata_recv_total += ndata;
504 data_recv_size[i] = ndata;
506 data_recv_offset[i] = 0;
508 data_recv_offset[i] = data_recv_offset[i-1] + data_recv_size[i-1];
514 int *data_send_offset =
new int[nproc];
515 int *data_send_size =
new int[nproc];
517 double *current_send_buffer = data_send_buffers;
518 for (
int i=0; i<nproc; i++) {
519 int nblock = nblock_send[i];
520 data_send_size[i] = 0;
521 for (
int j=0; j<nblock; j++) {
523 for (
int k=0; k<N; k++) {
524 bi.block(k) = block_send_buffers[isend++];
526 int ndata_in_block = block_size(bi);
527 memcpy(current_send_buffer, d.blocks_.find(bi)->second,
528 ndata_in_block*
sizeof(double));
529 current_send_buffer += ndata_in_block;
530 data_send_size[i] += ndata_in_block;
533 data_send_offset[i] = 0;
535 data_send_offset[i] = data_send_offset[i-1] + data_send_size[i-1];
537 delete[] block_send_buffers;
538 delete[] block_send_offset;
539 delete[] block_send_size;
540 if (clear_source_array) d.
clear();
543 double *data_recv_buffers =
new double[ndata_recv_total];
544 custom_alltoallv(data_send_buffers, data_send_size,
545 data_send_offset, MPI_DOUBLE,
546 data_recv_buffers, data_recv_size,
547 data_recv_offset, MPI_DOUBLE,
549 delete[] data_send_buffers;
550 delete[] data_send_offset;
551 delete[] data_send_size;
556 double *current_recv_buffer = data_recv_buffers;
557 for (
int i=0; i<nproc; i++) {
558 int nblock = nblock_recv[i];
559 for (
int j=0; j<nblock; j++) {
561 for (
int k=0; k<N; k++) {
562 bi.block(k) = block_recv_buffers[irecv++];
564 typename blockmap_t::iterator iblock = blocks_.find(bi);
565 double *data = iblock->second;
566 int ndata_in_block = block_size(bi);
567 memcpy(data, current_recv_buffer, ndata_in_block*
sizeof(
double));
568 current_recv_buffer += ndata_in_block;
572 delete[] block_recv_buffers;
573 delete[] block_recv_offset;
574 delete[] block_recv_size;
576 delete[] data_recv_buffers;
577 delete[] data_recv_offset;
578 delete[] data_recv_size;
580 delete[] nblock_send;
581 delete[] nblock_recv;
590 std::vector<int> nblock(nproc);
591 std::fill(nblock.begin(), nblock.end(), 0);
592 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
593 i != d.blocks_.end();
600 std::vector<int*> buffer(nproc);
601 for (
int i=0; i<nproc; i++) {
603 buffer[i] =
new int[1 + N*nblock[i]];
604 buffer[i][0] = nblock[i];
613 std::vector<int> ndata(nproc);
614 std::fill(ndata.begin(), ndata.end(), 1);
615 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
616 i != d.blocks_.end();
619 int &counter = ndata[node];
620 for (
int j=0; j<N; j++) {
622 add_unallocated_block(i->first);
625 buffer[node][counter++] = i->first.block(j);
631 int max_nblock = *std::max_element(nblock.begin(), nblock.end());
632 msg->max(max_nblock);
633 int *remote_blocks =
new int[1 + max_nblock*N];
634 int source = (me + nproc - 1)%nproc;
635 for (
int next_node = (me+1)%nproc;
637 next_node = (next_node+1)%nproc) {
639 MPI_Request send_req, recv_req;
640 MPI_Irecv(remote_blocks,1+max_nblock*N,MPI_INT,source,
641 100+source,MPI_COMM_WORLD,&recv_req);
642 MPI_Isend(buffer[next_node],1+nblock[next_node]*N,MPI_INT,next_node,
643 100+me,MPI_COMM_WORLD,&send_req);
645 MPI_Wait(&recv_req,&status);
647 int nblock = remote_blocks[iarray++];
648 for (
int i=0; i<nblock; i++) {
650 for (
int j=0; j<N; j++) bi.block(j) = remote_blocks[iarray++];
651 add_unallocated_block(bi);
653 MPI_Wait(&send_req,&status);
654 source = (source + nproc - 1)%nproc;
656 throw std::runtime_error(
"Array<N,C>::distributed_from_distributed requires MPI");
659 delete[] remote_blocks;
660 for (
int i=0; i<nproc; i++)
delete[] buffer[i];
668 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
669 i != d.blocks_.end();
671 int sz = block_size(i->first);
672 if (sz > maxsz) maxsz = sz;
676 MPI_Pack_size(maxsz, MPI_DOUBLE, MPI_COMM_WORLD, &maxalloc);
677 for (
int i=0; i<N; i++) {
679 MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &isz);
682 char *outgoing_buffer =
new char[maxalloc];
687 int n_to_be_received = blocks_.size() - nblock[me];
689 const int nrecv_req = 20;
690 std::vector<char *> incoming_buffer(nrecv_req);
691 for (
int i=0; i<nrecv_req; i++) {
692 incoming_buffer[i] =
new char[maxalloc];
694 MPI_Request recv_req[nrecv_req];
695 for (
int i=0; i<nrecv_req; i++) {
696 if (i<n_to_be_received) {
697 MPI_Irecv(incoming_buffer[i], maxalloc, MPI_BYTE, MPI_ANY_SOURCE,
698 99, MPI_COMM_WORLD, &recv_req[i]);
701 recv_req[i] = MPI_REQUEST_NULL;
705 for (
typename blockmap_t::const_iterator i = d.blocks_.begin();
706 i != d.blocks_.end() || n_to_be_received > 0;) {
708 MPI_Request send_req;
714 bool need_wait_for_send =
false;
715 if (i != d.blocks_.end()) {
721 typename blockmap_t::iterator loc = blocks_.find(i->first);
722 memcpy(loc->second, i->second, ndat*
sizeof(
double));
729 int outgoing_loc = 0;
730 for (
int j=0; j<N; j++) {
731 int jval = i->first.block(j);
732 MPI_Pack(&jval, 1, MPI_INT, outgoing_buffer, maxalloc,
733 &outgoing_loc, MPI_COMM_WORLD);
735 MPI_Pack(i->second, ndat, MPI_DOUBLE, outgoing_buffer, maxalloc,
736 &outgoing_loc, MPI_COMM_WORLD);
737 MPI_Isend(outgoing_buffer,outgoing_loc,MPI_BYTE,node,
738 99,MPI_COMM_WORLD,&send_req);
739 need_wait_for_send =
true;
741 throw std::runtime_error(
"LMP2:: cannot send remote block w/o MPI");
749 for (
int irecv=0; irecv<nrecv_req; irecv++) {
750 if (recv_req[irecv] == MPI_REQUEST_NULL)
continue;
753 MPI_Test(&recv_req[irecv],&flag,&status);
759 for (
int j=0; j<N; j++) {
761 MPI_Unpack(incoming_buffer[irecv], maxalloc, &position, &index,
762 1, MPI_INT, MPI_COMM_WORLD);
765 typename blockmap_t::iterator loc = blocks_.find(bi);
766 int block_sz = block_size(bi);
767 MPI_Unpack(incoming_buffer[irecv], maxalloc, &position, loc->second,
768 block_sz, MPI_DOUBLE, MPI_COMM_WORLD);
771 if (n_to_be_received >= nrecv_req) {
772 MPI_Irecv(incoming_buffer[irecv], maxalloc, MPI_BYTE, MPI_ANY_SOURCE,
773 99, MPI_COMM_WORLD, &recv_req[irecv]);
779 if (need_wait_for_send) {
781 MPI_Wait(&send_req,&status);
786 for (
int i=0; i<nrecv_req; i++) {
787 delete[] incoming_buffer[i];
789 delete[] outgoing_buffer;
791 if (clear_source_array) d.
clear();