20 #ifndef TILEDARRAY_REDUCE_TASK_H__INCLUDED 21 #define TILEDARRAY_REDUCE_TASK_H__INCLUDED 23 #include <TiledArray/config.h> 40 template <
typename T,
typename U>
42 typedef std::pair<Future<T>, Future<U> >
type;
48 template <
typename opT>
53 typedef typename std::remove_cv<
typename std::remove_reference<
56 typedef typename std::remove_cv<
typename std::remove_reference<
59 typedef std::pair<Future<first_argument_type>,
112 op_(result, arg.first, arg.second);
195 template <
typename opT>
198 typedef typename opT::result_type result_type;
199 typedef typename std::remove_const<
typename std::remove_reference<
200 typename opT::argument_type>::type>::type argument_type;
207 class ReduceTaskImpl :
public madness::TaskInterface {
217 ReduceTaskImpl* parent_;
219 madness::CallbackInterface* callback_;
220 madness::AtomicInt count_;
226 template <
typename T>
227 void register_callbacks(Future<T>& f) {
229 parent_->ready(
this);
232 f.register_callback(
this);
241 template <
typename T,
typename U>
242 void register_callbacks(std::pair<Future<T>, Future<U> >& p) {
243 if(p.first.probe() && p.second.probe()) {
244 parent_->ready(
this);
247 p.first.register_callback(
this);
248 p.second.register_callback(
this);
260 template <
typename Arg>
261 ReduceObject(ReduceTaskImpl* parent,
const Arg&
arg, madness::CallbackInterface* callback) :
262 parent_(parent), arg_(
arg), callback_(callback)
264 MADNESS_ASSERT(parent_);
265 register_callbacks(arg_);
271 virtual void notify() {
if((--count_) == 0) parent_->ready(
this); }
276 const argument_type&
arg()
const {
return arg_; }
283 static constexpr
const bool trace_tasks =
284 #ifdef TILEDARRAY_ENABLE_TASK_DEBUG_TRACE 290 if(object->callback_) {
292 object->callback_->notify_debug(
"destroy(*ReduceObject)");
294 object->callback_->notify();
301 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
302 return PoolTaskInterface::make_id(
id, *
this);
313 void reduce(std::shared_ptr<result_type>& result) {
318 ReduceObject* ready_object =
const_cast<ReduceObject*
>(ready_object_);
319 ready_object_ =
nullptr;
323 op_(*result, ready_object->arg());
328 }
else if(ready_result_) {
330 std::shared_ptr<result_type> ready_result = ready_result_;
331 ready_result_.reset();
335 op_(*result, *ready_result);
338 ready_result.reset();
341 ready_result_ = result;
352 void reduce_result_object(std::shared_ptr<result_type> result,
const ReduceObject*
object) {
354 op_(*result, object->arg());
368 void reduce_object_object(
const ReduceObject* object1,
const ReduceObject* object2) {
370 auto result = std::make_shared<result_type>(op_());
373 op_(*result, object1->arg());
374 op_(*result, object2->arg());
391 std::shared_ptr<result_type> ready_result_;
392 volatile ReduceObject* ready_object_;
393 Future<result_type> result_;
394 madness::Spinlock lock_;
395 madness::CallbackInterface* callback_;
405 ReduceTaskImpl(World& world, opT op, madness::CallbackInterface* callback) :
406 madness::TaskInterface(1, TaskAttributes::hipri()),
407 world_(world), op_(op), ready_result_(
std::make_shared<result_type>(op())),
408 ready_object_(nullptr), result_(), lock_(), callback_(callback)
411 virtual ~ReduceTaskImpl() { }
414 virtual void run(
const madness::TaskThreadEnv&) {
415 MADNESS_ASSERT(ready_result_);
416 result_.set(op_(*ready_result_));
427 void ready(ReduceObject*
object) {
428 MADNESS_ASSERT(
object);
431 std::shared_ptr<result_type> ready_result = ready_result_;
432 ready_result_.reset();
434 MADNESS_ASSERT(ready_result);
435 world_.taskq.add(
this, & ReduceTaskImpl::reduce_result_object,
436 ready_result,
object, TaskAttributes::hipri());
437 }
else if(ready_object_) {
438 ReduceObject* ready_object =
const_cast<ReduceObject*
>(ready_object_);
439 ready_object_ =
nullptr;
441 MADNESS_ASSERT(ready_object);
442 world_.taskq.add(
this, & ReduceTaskImpl::reduce_object_object,
443 object, ready_object, TaskAttributes::hipri());
445 ready_object_ = object;
453 const Future<result_type>& result()
const {
return result_; }
458 World& world()
const {
return world_; }
463 ReduceTaskImpl* pimpl_;
479 madness::CallbackInterface* callback =
nullptr) :
480 pimpl_(new ReduceTaskImpl(world, op, callback)), count_(0ul)
487 pimpl_(other.pimpl_), count_(other.count_)
489 other.pimpl_ =
nullptr;
503 pimpl_ = other.pimpl_;
504 count_ = other.count_;
505 other.pimpl_ =
nullptr;
523 template <
typename Arg>
524 int add(
const Arg& arg, madness::CallbackInterface* callback =
nullptr) {
525 MADNESS_ASSERT(pimpl_);
527 new typename ReduceTaskImpl::ReduceObject(pimpl_, arg, callback);
534 int count()
const {
return count_; }
542 MADNESS_ASSERT(pimpl_);
546 Future<result_type> result = pimpl_->result();
549 World& world = pimpl_->world();
550 world.taskq.add(pimpl_);
559 operator bool()
const {
return pimpl_ !=
nullptr; }
648 template <
typename opT>
669 ReducePairTask(World& world,
const opT& op = opT(), madness::CallbackInterface* callback =
nullptr) :
704 template <
typename L,
typename R>
705 void add(
const L& left,
const R& right, madness::CallbackInterface* callback =
nullptr) {
707 Future<second_argument_type>(right)), callback);
715 #endif // TILEDARRAY_REDUCE_TASK_H__INCLUDED
result_type operator()() const
Create an default reduction object.
ReduceTask(ReduceTask< opT > &&other) noexcept
Move constructor.
Wrapper that to convert a pair-wise reduction into a standard reduction.
static void destroy(const ReduceObject *object)
Destroy the object.
opT::result_type result_type
The result type of this reduction operation.
ReducePairOpWrapper< opT > & operator=(const ReducePairOpWrapper< opT > &other)
Copy assignment operator.
ReducePairTask(World &world, const opT &op=opT(), madness::CallbackInterface *callback=nullptr)
Constructor.
result_type operator()(result_type &temp) const
ReducePairTask< opT > & operator=(ReducePairTask< opT > &&other) noexcept
Move assignment operator.
virtual void notify()
Callback function that is invoked when the argument is ready.
ReducePairTask(ReducePairTask< opT > &&other) noexcept
Move constructor.
const argument_type & arg() const
Argument accessor.
int count() const
Argument count.
void add(const L &left, const R &right, madness::CallbackInterface *callback=nullptr)
Add a pair of arguments to the reduction task.
ReducePairOpWrapper()
Default constructor.
void operator()(result_type &result, const argument_type &arg) const
Reduce an argument pair.
~ReducePairOpWrapper()
Destructor.
void operator()(result_type &result, const result_type &arg)
Reduce two result objects.
std::remove_cv< typename std::remove_reference< typename opT::first_argument_type >::type >::type first_argument_type
The left-hand argument type.
Reduction argument container.
ReduceTask< opT > & operator=(ReduceTask< opT > &&other) noexcept
Move assignment operator.
ReducePairOpWrapper(const opT &op)
Constructor.
ReducePairTask()
Default constructor.
ReduceTask()
Default constructor.
ReduceObject(ReduceTaskImpl *parent, const Arg &arg, madness::CallbackInterface *callback)
Constructor.
std::pair< Future< first_argument_type >, Future< second_argument_type > > argument_type
The combine argument type.
int add(const Arg &arg, madness::CallbackInterface *callback=nullptr)
Add an argument to the reduction task.
std::pair< Future< T >, Future< U > > type
std::remove_cv< typename std::remove_reference< typename opT::second_argument_type >::type >::type second_argument_type
The right-hand argument type.
ReduceTask(World &world, const opT &op=opT(), madness::CallbackInterface *callback=nullptr)
Constructor.
ReducePairOpWrapper(const ReducePairOpWrapper< opT > &other)
Copy constructor.
Future< result_type > submit()
Submit the reduction task to the task queue.