20 #ifndef TILEDARRAY_REPLICATOR_H__INCLUDED
21 #define TILEDARRAY_REPLICATOR_H__INCLUDED
35 class Replicator :
public madness::WorldObject<Replicator<A> >,
36 private madness::Spinlock {
39 typedef madness::WorldObject<Replicator_>
41 typedef std::stack<madness::CallbackInterface*,
42 std::vector<madness::CallbackInterface*> >
46 std::vector<typename A::ordinal_type>
48 std::vector<Future<typename A::value_type> > data_;
49 madness::AtomicInt sent_;
51 volatile callback_type callbacks_;
52 volatile mutable bool probe_;
56 callback_type& callbacks =
const_cast<callback_type&
>(callbacks_);
57 while (!callbacks.empty()) {
58 callbacks.top()->notify();
64 class DelaySend :
public madness::TaskInterface {
71 : madness::TaskInterface(madness::TaskAttributes::hipri()),
73 typename std::vector<Future<typename A::value_type> >::iterator it =
74 parent_.data_.begin();
75 typename std::vector<Future<typename A::value_type> >::iterator end =
77 for (; it != end; ++it) {
79 madness::DependencyInterface::inc();
80 it->register_callback(
this);
86 virtual ~DelaySend() {}
89 virtual void run(
const madness::TaskThreadEnv&) { parent_.send(); }
97 madness::ScopedMutex<madness::Spinlock> locker(
this);
100 typename std::vector<Future<typename A::value_type> >::const_iterator it =
102 typename std::vector<Future<typename A::value_type> >::const_iterator
104 for (; it != end; ++it)
105 if (!it->probe())
break;
107 probe_ = (it == end);
121 DelaySend* delay_send_task =
new DelaySend(*
this);
122 world_.taskq.add(delay_send_task);
128 const long sent = ++sent_;
129 const ProcessID dest = (world_.rank() + sent) % world_.size();
131 if (dest != world_.rank()) {
132 wobj_type::task(dest, &Replicator_::send_handler, indices_, data_,
133 madness::TaskAttributes::hipri());
138 void send_handler(
const std::vector<typename A::ordinal_type>& indices,
140 typename std::vector<typename A::ordinal_type>::const_iterator index_it =
142 typename std::vector<Future<typename A::value_type> >::const_iterator
143 data_it = data.begin();
144 typename std::vector<Future<typename A::value_type> >::const_iterator
145 data_end = data.end();
147 for (; data_it != data_end; ++data_it, ++index_it)
148 destination_.set(*index_it, data_it->get());
155 : wobj_type(source.world()),
157 destination_(destination),
161 world_(source.world()),
167 typename A::pmap_interface::const_iterator end = source.pmap()->end();
168 typename A::pmap_interface::const_iterator it = source.pmap()->begin();
169 indices_.reserve(source.pmap()->local_size());
170 data_.reserve(source.pmap()->local_size());
171 if (source.is_dense()) {
173 for (; it != end; ++it) {
174 indices_.push_back(*it);
175 data_.push_back(source.find(*it));
176 destination_.set(*it, data_.back());
180 for (; it != end; ++it)
181 if (!source.is_zero(*it)) {
182 indices_.push_back(*it);
183 data_.push_back(source.find(*it));
184 destination_.set(*it, data_.back());
192 wobj_type::process_pending();
199 madness::ScopedMutex<madness::Spinlock> locker(
this);
200 return sent_ == world_.size();
210 madness::ScopedMutex<madness::Spinlock> locker(
this);
211 if (sent_ == world_.size())
214 const_cast<callback_type&
>(callbacks_).push(callback);
222 #endif // TILEDARRAY_REPLICATOR_H__INCLUDED