TiledArray  0.7.0
reduce_task.h
Go to the documentation of this file.
1 /*
2  * This file is a part of TiledArray.
3  * Copyright (C) 2013 Virginia Tech
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  */
19 
20 #ifndef TILEDARRAY_REDUCE_TASK_H__INCLUDED
21 #define TILEDARRAY_REDUCE_TASK_H__INCLUDED
22 
23 #include <TiledArray/config.h>
24 #include <TiledArray/error.h>
25 #include <TiledArray/madness.h>
26 
27 namespace TiledArray {
28  namespace detail {
29 
30  template <typename T>
31  struct ArgumentHelper {
32  typedef Future<T> type;
33  }; // struct ArgumentHelper
34 
35  template <typename T>
36  struct ArgumentHelper<Future<T> > {
37  typedef Future<T> type;
38  }; // struct ArgumentHelper
39 
40  template <typename T, typename U>
41  struct ArgumentHelper<std::pair<Future<T>, Future<U> > > {
42  typedef std::pair<Future<T>, Future<U> > type;
43  }; // struct ArgumentHelper
44 
46 
48  template <typename opT>
50  public:
51  typedef typename opT::result_type result_type;
53  typedef typename std::remove_cv<typename std::remove_reference<
54  typename opT::first_argument_type>::type>::type first_argument_type;
56  typedef typename std::remove_cv<typename std::remove_reference<
57  typename opT::second_argument_type>::type>::type second_argument_type;
59  typedef std::pair<Future<first_argument_type>,
60  Future<second_argument_type> > argument_type;
62 
63  private:
64  opT op_;
65 
66  public:
68  ReducePairOpWrapper() : op_() { }
69 
71 
73  ReducePairOpWrapper(const opT& op) : op_(op) { }
74 
76 
79  op_(other.op_)
80  { }
81 
84 
86 
90  op_ = other.op_;
91  return *this;
92  }
93 
95  result_type operator()() const { return op_(); }
96 
97  result_type operator()(result_type& temp) const { return op_(temp); }
98 
100 
103  void operator()(result_type& result, const result_type& arg) {
104  op_(result, arg);
105  }
106 
108 
111  void operator()(result_type& result, const argument_type& arg) const {
112  op_(result, arg.first, arg.second);
113  }
114 
115  }; // class ReducePairOpWrapper
116 
117 
119 
195  template <typename opT>
196  class ReduceTask {
197  private:
198  typedef typename opT::result_type result_type;
199  typedef typename std::remove_const<typename std::remove_reference<
200  typename opT::argument_type>::type>::type argument_type;
201 
203 
207  class ReduceTaskImpl : public madness::TaskInterface {
208  public:
209 
211 
214  class ReduceObject : public madness::CallbackInterface {
215  private:
216 
217  ReduceTaskImpl* parent_;
219  madness::CallbackInterface* callback_;
220  madness::AtomicInt count_;
221 
223 
226  template <typename T>
227  void register_callbacks(Future<T>& f) {
228  if(f.probe()) {
229  parent_->ready(this);
230  } else {
231  count_ = 1;
232  f.register_callback(this);
233  }
234  }
235 
237 
241  template <typename T, typename U>
242  void register_callbacks(std::pair<Future<T>, Future<U> >& p) {
243  if(p.first.probe() && p.second.probe()) {
244  parent_->ready(this);
245  } else {
246  count_ = 2;
247  p.first.register_callback(this);
248  p.second.register_callback(this);
249  }
250  }
251 
252  public:
253 
255 
260  template <typename Arg>
261  ReduceObject(ReduceTaskImpl* parent, const Arg& arg, madness::CallbackInterface* callback) :
262  parent_(parent), arg_(arg), callback_(callback)
263  {
264  MADNESS_ASSERT(parent_);
265  register_callbacks(arg_);
266  }
267 
268  virtual ~ReduceObject() { }
269 
271  virtual void notify() { if((--count_) == 0) parent_->ready(this); }
272 
274 
276  const argument_type& arg() const { return arg_; }
277 
279 
282  static void destroy(const ReduceObject* object) {
283  static constexpr const bool trace_tasks =
284 #ifdef TILEDARRAY_ENABLE_TASK_DEBUG_TRACE
285  true
286 #else
287  false
288 #endif
289  ;
290  if(object->callback_) {
291  if (trace_tasks)
292  object->callback_->notify_debug("destroy(*ReduceObject)");
293  else
294  object->callback_->notify();
295  }
296  delete object;
297  }
298 
299  }; // class ReduceObject
300 
301  virtual void get_id(std::pair<void*,unsigned short>& id) const {
302  return PoolTaskInterface::make_id(id, *this);
303  }
304 
306 
313  void reduce(std::shared_ptr<result_type>& result) {
314  while(result) {
315  lock_.lock(); // <<< Begin critical section
316  if(ready_object_) {
317  // Get the ready argument
318  ReduceObject* ready_object = const_cast<ReduceObject*>(ready_object_);
319  ready_object_ = nullptr;
320  lock_.unlock(); // <<< End critical section
321 
322  // Reduce the argument that was held by ready_object_
323  op_(*result, ready_object->arg());
324 
325  // cleanup the argument
326  ReduceObject::destroy(ready_object);
327  this->dec();
328  } else if(ready_result_) {
329  // Get the ready result
330  std::shared_ptr<result_type> ready_result = ready_result_;
331  ready_result_.reset();
332  lock_.unlock(); // <<< End critical section
333 
334  // Reduce the result that was held by ready_result_
335  op_(*result, *ready_result);
336 
337  // cleanup the result
338  ready_result.reset();
339  } else {
340  // Nothing is ready, so place result in the ready state.
341  ready_result_ = result;
342  result.reset();
343  lock_.unlock(); // <<< End critical section
344  }
345  }
346  }
347 
349 
352  void reduce_result_object(std::shared_ptr<result_type> result, const ReduceObject* object) {
353  // Reduce the argument
354  op_(*result, object->arg());
355 
356  // Cleanup the argument
357  ReduceObject::destroy(object);
358 
359  // Check for more reductions
360  reduce(result);
361 
362  // Decrement the dependency counter for the argument. This must
363  // be done after the reduce call to avoid a race condition.
364  this->dec();
365  }
366 
368  void reduce_object_object(const ReduceObject* object1, const ReduceObject* object2) {
369  // Construct an empty result object
370  auto result = std::make_shared<result_type>(op_());
371 
372  // Reduce the two arguments
373  op_(*result, object1->arg());
374  op_(*result, object2->arg());
375 
376  // Cleanup arguments
377  ReduceObject::destroy(object1);
378  ReduceObject::destroy(object2);
379 
380  // Check for more reductions
381  reduce(result);
382 
383  // Decrement the dependency counter for the two arguments. This
384  // must be done after the reduce call to avoid a race condition.
385  this->dec();
386  this->dec();
387  }
388 
389  World& world_;
390  opT op_;
391  std::shared_ptr<result_type> ready_result_;
392  volatile ReduceObject* ready_object_;
393  Future<result_type> result_;
394  madness::Spinlock lock_;
395  madness::CallbackInterface* callback_;
396 
397  public:
398 
400 
405  ReduceTaskImpl(World& world, opT op, madness::CallbackInterface* callback) :
406  madness::TaskInterface(1, TaskAttributes::hipri()),
407  world_(world), op_(op), ready_result_(std::make_shared<result_type>(op())),
408  ready_object_(nullptr), result_(), lock_(), callback_(callback)
409  { }
410 
411  virtual ~ReduceTaskImpl() { }
412 
414  virtual void run(const madness::TaskThreadEnv&) {
415  MADNESS_ASSERT(ready_result_);
416  result_.set(op_(*ready_result_));
417  if(callback_)
418  callback_->notify();
419  }
420 
422 
427  void ready(ReduceObject* object) {
428  MADNESS_ASSERT(object);
429  lock_.lock(); // <<< Begin critical section
430  if(ready_result_) {
431  std::shared_ptr<result_type> ready_result = ready_result_;
432  ready_result_.reset();
433  lock_.unlock(); // <<< End critical section
434  MADNESS_ASSERT(ready_result);
435  world_.taskq.add(this, & ReduceTaskImpl::reduce_result_object,
436  ready_result, object, TaskAttributes::hipri());
437  } else if(ready_object_) {
438  ReduceObject* ready_object = const_cast<ReduceObject*>(ready_object_);
439  ready_object_ = nullptr;
440  lock_.unlock(); // <<< End critical section
441  MADNESS_ASSERT(ready_object);
442  world_.taskq.add(this, & ReduceTaskImpl::reduce_object_object,
443  object, ready_object, TaskAttributes::hipri());
444  } else {
445  ready_object_ = object;
446  lock_.unlock(); // <<< End critical section
447  }
448  }
449 
451 
453  const Future<result_type>& result() const { return result_; }
454 
456 
458  World& world() const { return world_; }
459 
460  }; // class ReduceTaskImpl
461 
462 
463  ReduceTaskImpl* pimpl_;
464  std::size_t count_;
465 
466  public:
467 
469  ReduceTask() : pimpl_(nullptr), count_(0ul) { }
470 
471 
473 
478  ReduceTask(World& world, const opT& op = opT(),
479  madness::CallbackInterface* callback = nullptr) :
480  pimpl_(new ReduceTaskImpl(world, op, callback)), count_(0ul)
481  { }
482 
484 
486  ReduceTask(ReduceTask<opT>&& other) noexcept :
487  pimpl_(other.pimpl_), count_(other.count_)
488  {
489  other.pimpl_ = nullptr;
490  other.count_ = 0ul;
491  }
492 
494 
497  ~ReduceTask() { delete pimpl_; }
498 
500 
503  pimpl_ = other.pimpl_;
504  count_ = other.count_;
505  other.pimpl_ = nullptr;
506  other.count_ = 0;
507  return *this;
508  }
509 
510  // Non-copyable
511  ReduceTask(const ReduceTask<opT>&) = delete;
512  ReduceTask<opT>& operator=(const ReduceTask<opT>&) = delete;
513 
515 
523  template <typename Arg>
524  int add(const Arg& arg, madness::CallbackInterface* callback = nullptr) {
525  MADNESS_ASSERT(pimpl_);
526  pimpl_->inc();
527  new typename ReduceTaskImpl::ReduceObject(pimpl_, arg, callback);
528  return ++count_;
529  }
530 
532 
534  int count() const { return count_; }
535 
537 
541  Future<result_type> submit() {
542  MADNESS_ASSERT(pimpl_);
543 
544  // Get the result before submitting/running the task, otherwise the
545  // task could run and be deleted before we are done here.
546  Future<result_type> result = pimpl_->result();
547 
548  pimpl_->dec();
549  World& world = pimpl_->world();
550  world.taskq.add(pimpl_);
551 
552  pimpl_ = nullptr;
553  return result;
554  }
555 
557 
559  operator bool() const { return pimpl_ != nullptr; }
560 
561  }; // class ReduceTask
562 
563 
564 
566 
648  template <typename opT>
649  class ReducePairTask : public ReduceTask<ReducePairOpWrapper<opT> > {
650  private:
651 
653  typedef typename op_type::first_argument_type first_argument_type;
654  typedef typename op_type::second_argument_type second_argument_type;
655  typedef typename op_type::argument_type argument_type;
657 
658  public:
659 
662 
664 
669  ReducePairTask(World& world, const opT& op = opT(), madness::CallbackInterface* callback = nullptr) :
670  ReduceTask_(world, op_type(op), callback)
671  { }
672 
674 
677  ReduceTask_(std::move(other))
678  { }
679 
681 
684  ReduceTask_::operator=(std::move(other));
685  return *this;
686  }
687 
689  ReducePairTask(const ReducePairTask<opT>&) = delete;
691 
693 
704  template <typename L, typename R>
705  void add(const L& left, const R& right, madness::CallbackInterface* callback = nullptr) {
706  ReduceTask_::add(argument_type(Future<first_argument_type>(left),
707  Future<second_argument_type>(right)), callback);
708  }
709 
710  }; // class ReducePairTask
711 
712  } // namespace detail
713 } // namespace TiledArray
714 
715 #endif // TILEDARRAY_REDUCE_TASK_H__INCLUDED
result_type operator()() const
Create an default reduction object.
Definition: reduce_task.h:95
ReduceTask(ReduceTask< opT > &&other) noexcept
Move constructor.
Definition: reduce_task.h:486
Wrapper that to convert a pair-wise reduction into a standard reduction.
Definition: reduce_task.h:49
static void destroy(const ReduceObject *object)
Destroy the object.
Definition: reduce_task.h:282
opT::result_type result_type
The result type of this reduction operation.
Definition: reduce_task.h:51
ReducePairOpWrapper< opT > & operator=(const ReducePairOpWrapper< opT > &other)
Copy assignment operator.
Definition: reduce_task.h:89
ReducePairTask(World &world, const opT &op=opT(), madness::CallbackInterface *callback=nullptr)
Constructor.
Definition: reduce_task.h:669
result_type operator()(result_type &temp) const
Definition: reduce_task.h:97
ReducePairTask< opT > & operator=(ReducePairTask< opT > &&other) noexcept
Move assignment operator.
Definition: reduce_task.h:683
STL namespace.
virtual void notify()
Callback function that is invoked when the argument is ready.
Definition: reduce_task.h:271
ReducePairTask(ReducePairTask< opT > &&other) noexcept
Move constructor.
Definition: reduce_task.h:676
const argument_type & arg() const
Argument accessor.
Definition: reduce_task.h:276
int count() const
Argument count.
Definition: reduce_task.h:534
void add(const L &left, const R &right, madness::CallbackInterface *callback=nullptr)
Add a pair of arguments to the reduction task.
Definition: reduce_task.h:705
ReducePairOpWrapper()
Default constructor.
Definition: reduce_task.h:68
void operator()(result_type &result, const argument_type &arg) const
Reduce an argument pair.
Definition: reduce_task.h:111
void operator()(result_type &result, const result_type &arg)
Reduce two result objects.
Definition: reduce_task.h:103
std::remove_cv< typename std::remove_reference< typename opT::first_argument_type >::type >::type first_argument_type
The left-hand argument type.
Definition: reduce_task.h:54
ReduceTask< opT > & operator=(ReduceTask< opT > &&other) noexcept
Move assignment operator.
Definition: reduce_task.h:502
ReducePairOpWrapper(const opT &op)
Constructor.
Definition: reduce_task.h:73
ReducePairTask()
Default constructor.
Definition: reduce_task.h:661
ReduceTask()
Default constructor.
Definition: reduce_task.h:469
ReduceObject(ReduceTaskImpl *parent, const Arg &arg, madness::CallbackInterface *callback)
Constructor.
Definition: reduce_task.h:261
std::pair< Future< first_argument_type >, Future< second_argument_type > > argument_type
The combine argument type.
Definition: reduce_task.h:60
int add(const Arg &arg, madness::CallbackInterface *callback=nullptr)
Add an argument to the reduction task.
Definition: reduce_task.h:524
std::remove_cv< typename std::remove_reference< typename opT::second_argument_type >::type >::type second_argument_type
The right-hand argument type.
Definition: reduce_task.h:57
ReduceTask(World &world, const opT &op=opT(), madness::CallbackInterface *callback=nullptr)
Constructor.
Definition: reduce_task.h:478
ReducePairOpWrapper(const ReducePairOpWrapper< opT > &other)
Copy constructor.
Definition: reduce_task.h:78
Future< result_type > submit()
Submit the reduction task to the task queue.
Definition: reduce_task.h:541