TiledArray  0.7.0
replicator.h
Go to the documentation of this file.
1 /*
2  * This file is a part of TiledArray.
3  * Copyright (C) 2013 Virginia Tech
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  */
19 
20 #ifndef TILEDARRAY_REPLICATOR_H__INCLUDED
21 #define TILEDARRAY_REPLICATOR_H__INCLUDED
22 
23 #include <TiledArray/madness.h>
24 
25 namespace TiledArray {
26  namespace detail {
27 
29 
34  template <typename A>
35  class Replicator : public madness::WorldObject<Replicator<A> >, private madness::Spinlock {
36  private:
37  typedef Replicator<A> Replicator_;
38  typedef madness::WorldObject<Replicator_> wobj_type;
39  typedef std::stack<madness::CallbackInterface*, std::vector<madness::CallbackInterface*> > callback_type;
40 
41  A destination_;
42  std::vector<typename A::size_type> indices_;
43  std::vector<Future<typename A::value_type> > data_;
44  madness::AtomicInt sent_;
45  World& world_;
46  volatile callback_type callbacks_;
47  volatile mutable bool probe_;
48 
50  void do_callbacks() {
51  callback_type& callbacks = const_cast<callback_type&>(callbacks_);
52  while(! callbacks.empty()) {
53  callbacks.top()->notify();
54  callbacks.pop();
55  }
56  }
57 
59  class DelaySend : public madness::TaskInterface {
60  private:
61  Replicator_& parent_;
62 
63  public:
64 
66  DelaySend(Replicator_& parent) :
67  madness::TaskInterface(madness::TaskAttributes::hipri()),
68  parent_(parent)
69  {
70  typename std::vector<Future<typename A::value_type> >::iterator it =
71  parent_.data_.begin();
72  typename std::vector<Future<typename A::value_type> >::iterator end =
73  parent_.data_.end();
74  for(; it != end; ++it) {
75  if(! it->probe()) {
76  madness::DependencyInterface::inc();
77  it->register_callback(this);
78  }
79  }
80  }
81 
83  virtual ~DelaySend() { }
84 
86  virtual void run(const madness::TaskThreadEnv&) { parent_.send(); }
87 
88  }; // class DelaySend
89 
91 
93  bool probe() const {
94  madness::ScopedMutex<madness::Spinlock> locker(this);
95 
96  if(! probe_) {
97  typename std::vector<Future<typename A::value_type> >::const_iterator it =
98  data_.begin();
99  typename std::vector<Future<typename A::value_type> >::const_iterator end =
100  data_.end();
101  for(; it != end; ++it)
102  if(! it->probe())
103  break;
104 
105  probe_ = (it == end);
106  }
107 
108  return probe_;
109  }
110 
112  void delay_send() {
113  if(probe()) {
114  // The data is ready so send it now.
115  send(); // Replication is done
116  } else {
117  // The local data is not ready to be sent, so create a task that will
118  // send it when it is ready.
119  DelaySend* delay_send_task = new DelaySend(*this);
120  world_.taskq.add(delay_send_task);
121  }
122  }
123 
125  void send() {
126  const long sent = ++sent_;
127  const ProcessID dest = (world_.rank() + sent) % world_.size();
128 
129  if(dest != world_.rank()) {
130  wobj_type::task(dest, & Replicator_::send_handler, indices_, data_,
131  madness::TaskAttributes::hipri());
132  } else
133  do_callbacks(); // Replication is done
134  }
135 
136  void send_handler(const std::vector<typename A::size_type>& indices,
137  const std::vector<Future<typename A::value_type> >& data)
138  {
139  typename std::vector<typename A::size_type>::const_iterator index_it =
140  indices.begin();
141  typename std::vector<Future<typename A::value_type> >::const_iterator data_it =
142  data.begin();
143  typename std::vector<Future<typename A::value_type> >::const_iterator data_end =
144  data.end();
145 
146  for(; data_it != data_end; ++data_it, ++index_it)
147  destination_.set(*index_it, data_it->get());
148 
149  delay_send();
150  }
151 
152  public:
153 
154  Replicator(const A& source, const A destination) :
155  wobj_type(source.world()), madness::Spinlock(),
156  destination_(destination), indices_(), data_(), sent_(),
157  world_(source.world()), callbacks_(), probe_(false)
158  {
159  sent_ = 0;
160 
161  // Generate a list of local tiles from other.
162  typename A::pmap_interface::const_iterator end = source.pmap()->end();
163  typename A::pmap_interface::const_iterator it = source.pmap()->begin();
164  indices_.reserve(source.pmap()->local_size());
165  data_.reserve(source.pmap()->local_size());
166  if(source.is_dense()) {
167  // When dense, all tiles are present
168  for(; it != end; ++it) {
169  indices_.push_back(*it);
170  data_.push_back(source.find(*it));
171  destination_.set(*it, data_.back());
172  }
173  } else {
174  // When sparse, we need to generate a list
175  for(; it != end; ++it)
176  if(! source.is_zero(*it)) {
177  indices_.push_back(*it);
178  data_.push_back(source.find(*it));
179  destination_.set(*it, data_.back());
180  }
181  }
182 
184  delay_send();
185 
186  // Process any pending messages
187  wobj_type::process_pending();
188  }
189 
191 
193  bool done() {
194  madness::ScopedMutex<madness::Spinlock> locker(this);
195  return sent_ == world_.size();
196  }
197 
198 
200 
205  void register_callback(madness::CallbackInterface* callback) {
206  madness::ScopedMutex<madness::Spinlock> locker(this);
207  if(sent_ == world_.size())
208  callback->notify();
209  else
210  const_cast<callback_type&>(callbacks_).push(callback);
211  }
212 
213  }; // class Replicator
214 
215  } // namespace detail
216 } // namespace TiledArray
217 
218 
219 #endif // TILEDARRAY_REPLICATOR_H__INCLUDED
auto data(T &t)
Container data pointer accessor.
Definition: utility.h:89
Replicator(const A &source, const A destination)
Definition: replicator.h:154
bool done()
Check that the replication is complete.
Definition: replicator.h:193
Replicate a Array object.
Definition: replicator.h:35
void register_callback(madness::CallbackInterface *callback)
Add a callback.
Definition: replicator.h:205