20 #ifndef TILEDARRAY_REPLICATOR_H__INCLUDED 21 #define TILEDARRAY_REPLICATOR_H__INCLUDED 35 class Replicator :
public madness::WorldObject<Replicator<A> >,
private madness::Spinlock {
38 typedef madness::WorldObject<Replicator_> wobj_type;
39 typedef std::stack<madness::CallbackInterface*, std::vector<madness::CallbackInterface*> > callback_type;
42 std::vector<typename A::size_type> indices_;
43 std::vector<Future<typename A::value_type> > data_;
44 madness::AtomicInt sent_;
46 volatile callback_type callbacks_;
47 volatile mutable bool probe_;
51 callback_type& callbacks =
const_cast<callback_type&
>(callbacks_);
52 while(! callbacks.empty()) {
53 callbacks.top()->notify();
59 class DelaySend :
public madness::TaskInterface {
67 madness::TaskInterface(madness::TaskAttributes::hipri()),
70 typename std::vector<Future<typename A::value_type> >::iterator it =
71 parent_.data_.begin();
72 typename std::vector<Future<typename A::value_type> >::iterator end =
74 for(; it != end; ++it) {
76 madness::DependencyInterface::inc();
77 it->register_callback(
this);
83 virtual ~DelaySend() { }
86 virtual void run(
const madness::TaskThreadEnv&) { parent_.send(); }
94 madness::ScopedMutex<madness::Spinlock> locker(
this);
97 typename std::vector<Future<typename A::value_type> >::const_iterator it =
99 typename std::vector<Future<typename A::value_type> >::const_iterator end =
101 for(; it != end; ++it)
105 probe_ = (it == end);
119 DelaySend* delay_send_task =
new DelaySend(*
this);
120 world_.taskq.add(delay_send_task);
126 const long sent = ++sent_;
127 const ProcessID dest = (world_.rank() + sent) % world_.size();
129 if(dest != world_.rank()) {
130 wobj_type::task(dest, & Replicator_::send_handler, indices_, data_,
131 madness::TaskAttributes::hipri());
136 void send_handler(
const std::vector<typename A::size_type>& indices,
137 const std::vector<Future<typename A::value_type> >&
data)
139 typename std::vector<typename A::size_type>::const_iterator index_it =
141 typename std::vector<Future<typename A::value_type> >::const_iterator data_it =
143 typename std::vector<Future<typename A::value_type> >::const_iterator data_end =
146 for(; data_it != data_end; ++data_it, ++index_it)
147 destination_.set(*index_it, data_it->get());
155 wobj_type(source.world()),
madness::Spinlock(),
156 destination_(destination), indices_(), data_(), sent_(),
157 world_(source.world()), callbacks_(), probe_(false)
162 typename A::pmap_interface::const_iterator end = source.pmap()->end();
163 typename A::pmap_interface::const_iterator it = source.pmap()->begin();
164 indices_.reserve(source.pmap()->local_size());
165 data_.reserve(source.pmap()->local_size());
166 if(source.is_dense()) {
168 for(; it != end; ++it) {
169 indices_.push_back(*it);
170 data_.push_back(source.find(*it));
171 destination_.set(*it, data_.back());
175 for(; it != end; ++it)
176 if(! source.is_zero(*it)) {
177 indices_.push_back(*it);
178 data_.push_back(source.find(*it));
179 destination_.set(*it, data_.back());
187 wobj_type::process_pending();
194 madness::ScopedMutex<madness::Spinlock> locker(
this);
195 return sent_ == world_.size();
206 madness::ScopedMutex<madness::Spinlock> locker(
this);
207 if(sent_ == world_.size())
210 const_cast<callback_type&
>(callbacks_).push(callback);
219 #endif // TILEDARRAY_REPLICATOR_H__INCLUDED auto data(T &t)
Container data pointer accessor.
Replicator(const A &source, const A destination)
bool done()
Check that the replication is complete.
Replicate a Array object.
void register_callback(madness::CallbackInterface *callback)
Add a callback.