distributed_storage.h
Go to the documentation of this file.
1 /*
2  * This file is a part of TiledArray.
3  * Copyright (C) 2013 Virginia Tech
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  */
19 
20 #ifndef TILEDARRAY_DISTRIBUTED_STORAGE_H__INCLUDED
21 #define TILEDARRAY_DISTRIBUTED_STORAGE_H__INCLUDED
22 
23 #include <TiledArray/pmap/pmap.h>
24 
25 namespace TiledArray {
26 namespace detail {
27 
29 
43 template <typename T>
44 class DistributedStorage : public madness::WorldObject<DistributedStorage<T> > {
45  public:
47  typedef madness::WorldObject<DistributedStorage_>
49 
50  typedef std::size_t size_type;
51  typedef size_type key_type;
52  typedef T value_type;
54  typedef Pmap pmap_interface;
55  typedef madness::ConcurrentHashMap<key_type, future>
57  typedef typename container_type::accessor
59  typedef typename container_type::const_accessor
61 
62  private:
63  const size_type max_size_;
64  std::shared_ptr<pmap_interface>
66  pmap_;
67  mutable container_type data_;
68  madness::AtomicInt num_live_ds_;
69 
70  // not allowed
72  DistributedStorage_& operator=(const DistributedStorage_&);
73 
74  void set_handler(const size_type i, const value_type& value) {
75  future& f = get_local(i);
76 
77  // Check that the future has not been set already.
78  TA_ASSERT(!f.probe() && "Tile has already been assigned.");
79 
80  f.set(value);
81  }
82 
83  void get_handler(const size_type i,
84  const typename future::remote_refT& ref) const {
85  const future& f = get_local(i);
86  future remote_f(ref);
87  remote_f.set(f);
88  }
89 
90  void set_remote(const size_type i, const value_type& value) {
91  WorldObject_::task(owner(i), &DistributedStorage_::set_handler, i, value,
92  madness::TaskAttributes::hipri());
93  }
94 
95  struct DelayedSet : public madness::CallbackInterface {
96  private:
97  DistributedStorage_& ds_;
98  size_type index_;
99  future future_;
100 
101  public:
103  : ds_(ds), index_(i), future_(f) {
104  ++ds_.num_live_ds_;
105  }
106 
107  virtual ~DelayedSet() { --ds_.num_live_ds_; }
108 
109  virtual void notify() {
110  ds_.set_remote(index_, future_);
111  delete this;
112  }
113  }; // struct DelayedSet
114  friend struct DelayedSet;
115 
116  public:
119 
128  const std::shared_ptr<pmap_interface>& pmap)
129  : WorldObject_(world),
130  max_size_(max_size),
131  pmap_(pmap),
132  data_((max_size / world.size()) + 11) {
133  // Check that the process map is appropriate for this storage object
134  TA_ASSERT(pmap_);
135  TA_ASSERT(pmap_->size() == max_size);
136  TA_ASSERT(pmap_->rank() == pmap_interface::size_type(world.rank()));
137  TA_ASSERT(pmap_->procs() == pmap_interface::size_type(world.size()));
138  num_live_ds_ = 0;
139  WorldObject_::process_pending();
140  }
141 
143  if (num_live_ds_ != 0) {
144  madness::print_error(
145  "DistributedStorage (object id=\", id(), \") destroyed while "
146  "outstanding tasks exist. Add a fence() to extend the lifetime of "
147  "this object.");
148  abort();
149  }
150  }
151 
152  using WorldObject_::get_world;
153 
155 
158  const std::shared_ptr<pmap_interface>& pmap() const { return pmap_; }
159 
161 
163  ProcessID owner(size_type i) const {
164  TA_ASSERT(i < max_size_);
165  TA_ASSERT(pmap_);
166  return pmap_->owner(i);
167  }
168 
170 
175  bool is_local(size_type i) const {
176  TA_ASSERT(i < max_size_);
177  TA_ASSERT(pmap_);
178  return pmap_->is_local(i);
179  }
180 
182 
186  size_type size() const { return data_.size(); }
187 
189 
194  size_type max_size() const { return max_size_; }
195 
197 
202  future get(size_type i) const {
203  TA_ASSERT(i < max_size_);
204  if (is_local(i)) {
205  return get_local(i);
206  } else {
207  // Send a request to the owner of i for the element.
208  future result;
209  WorldObject_::task(owner(i), &DistributedStorage_::get_handler, i,
210  result.remote_ref(get_world()),
211  madness::TaskAttributes::hipri());
212 
213  return result;
214  }
215  }
216 
218 
223  const future& get_local(const size_type i) const {
224  TA_ASSERT(pmap_->is_local(i));
225 
226  // Return the local element.
227  const_accessor acc;
228  [[maybe_unused]] const bool inserted = data_.insert(acc, i);
229  return acc->second;
230  }
231 
233 
239  TA_ASSERT(pmap_->is_local(i));
240 
241  // Return the local element.
242  accessor acc;
243  [[maybe_unused]] const bool inserted = data_.insert(acc, i);
244  return acc->second;
245  }
246 
248 
254  void set(size_type i, const value_type& value) {
255  TA_ASSERT(i < max_size_);
256  if (is_local(i))
257  set_handler(i, value);
258  else
259  set_remote(i, value);
260  }
261 
263 
272  void set(size_type i, const future& f) {
273  TA_ASSERT(i < max_size_);
274  if (is_local(i)) {
275  const_accessor acc;
276  if (!data_.insert(acc, typename container_type::datumT(i, f))) {
277  // The element was already in the container, so set it with f.
278  future existing_f = acc->second;
279  acc.release();
280 
281  // Check that the future has not been set already.
282  TA_ASSERT(!existing_f.probe() && "Tile has already been assigned.");
283  // Set the future
284  existing_f.set(f);
285  }
286  } else {
287  if (f.probe()) {
288  set_remote(i, f);
289  } else {
290  DelayedSet* set_callback = new DelayedSet(*this, i, f);
291  const_cast<future&>(f).register_callback(set_callback);
292  }
293  }
294  }
295 
296 }; // class DistributedStorage
297 
298 } // namespace detail
299 } // namespace TiledArray
300 
301 #endif // TILEDARRAY_DISTRIBUTED_STORAGE_H__INCLUDED
container_type::accessor accessor
Local element accessor type.
const future & get_local(const size_type i) const
Get local element.
DistributedStorage< T > DistributedStorage_
This object type.
size_type max_size() const
Max size accessor.
void set(size_type i, const value_type &value)
Set element i with value.
void set(size_type i, const future &f)
Set element i with a Future f.
virtual size_type owner(const size_type tile) const =0
Maps tile to the processor that owns it.
Future< value_type > future
Element container type.
container_type::const_accessor const_accessor
Local element const accessor type.
size_type rank() const
Process rank accessor.
Definition: pmap.h:107
future & get_local(const size_type i)
Get local element.
#define TA_ASSERT(EXPR,...)
Definition: error.h:39
future get(size_type i) const
Get local or remote element.
Distributed storage container.
size_type size() const
Size accessor.
Definition: pmap.h:102
DistributedStorage(World &world, size_type max_size, const std::shared_ptr< pmap_interface > &pmap)
ProcessID owner(size_type i) const
Element owner.
madness::ConcurrentHashMap< key_type, future > container_type
Local container type.
madness::WorldObject< DistributedStorage_ > WorldObject_
Base object type.
bool is_local(size_type i) const
Local element query.
virtual bool is_local(const size_type tile) const =0
Check that the tile is owned by this process.
std::size_t size_type
Size type.
Definition: pmap.h:57
Process map.
Definition: pmap.h:55
Pmap pmap_interface
Process map interface type.
const std::shared_ptr< pmap_interface > & pmap() const
Process map accessor.
size_type procs() const
Process count accessor.
Definition: pmap.h:112
size_type size() const
Number of local elements.