Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
23 
24 #include "tbb_stddef.h"
25 #include "atomic.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "task.h"
32 #include "tbb_exception.h"
35 #include "tbb_profiling.h"
36 #include "task_arena.h"
37 
38 #if __TBB_PREVIEW_ASYNC_MSG
39 #include <vector> // std::vector in internal::async_storage
40 #include <memory> // std::shared_ptr in async_msg
41 #endif
42 
43 #if __TBB_PREVIEW_STREAMING_NODE
44 // For streaming_node
45 #include <array> // std::array
46 #include <unordered_map> // std::unordered_map
47 #include <type_traits> // std::decay, std::true_type, std::false_type
48 #endif // __TBB_PREVIEW_STREAMING_NODE
49 
50 #if TBB_DEPRECATED_FLOW_ENQUEUE
51 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
52 #else
53 #define FLOW_SPAWN(a) tbb::task::spawn((a))
54 #endif
55 
56 // use the VC10 or gcc version of tuple if it is available.
57 #if __TBB_CPP11_TUPLE_PRESENT
58  #include <tuple>
59 namespace tbb {
60  namespace flow {
61  using std::tuple;
62  using std::tuple_size;
63  using std::tuple_element;
64  using std::get;
65  }
66 }
67 #else
68  #include "compat/tuple"
69 #endif
70 
71 #include<list>
72 #include<queue>
73 
84 namespace tbb {
85 namespace flow {
86 
88 enum concurrency { unlimited = 0, serial = 1 };
89 
90 namespace interface10 {
91 
93 struct null_type {};
94 
96 class continue_msg {};
97 
99 template< typename T > class sender;
100 template< typename T > class receiver;
102 template< typename T > class limiter_node; // needed for resetting decrementer
103 template< typename R, typename B > class run_and_put_task;
105 namespace internal {
107 template<typename T, typename M> class successor_cache;
108 template<typename T, typename M> class broadcast_cache;
109 template<typename T, typename M> class round_robin_cache;
110 template<typename T, typename M> class predecessor_cache;
111 template<typename T, typename M> class reservable_predecessor_cache;
113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
114 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
115 // C == receiver< ... > or sender< ... >, depending.
116 template<typename C>
117 class edge_container {
119 public:
120  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
122  void add_edge(C &s) {
123  built_edges.push_back(&s);
124  }
126  void delete_edge(C &s) {
127  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
128  if (*i == &s) {
129  (void)built_edges.erase(i);
130  return; // only remove one predecessor per request
131  }
132  }
133  }
135  void copy_edges(edge_list_type &v) {
136  v = built_edges;
137  }
139  size_t edge_count() {
140  return (size_t)(built_edges.size());
141  }
143  void clear() {
144  built_edges.clear();
145  }
147  // methods remove the statement from all predecessors/successors liste in the edge
148  // container.
149  template< typename S > void sender_extract(S &s);
150  template< typename R > void receiver_extract(R &r);
152 private:
153  edge_list_type built_edges;
154 }; // class edge_container
155 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
156 
157 } // namespace internal
159 } // namespace interface10
160 } // namespace flow
161 } // namespace tbb
162 
165 
166 namespace tbb {
167 namespace flow {
168 namespace interface10 {
169 
170 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
171 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
172  // if no RHS task, don't change left.
173  if (right == NULL) return left;
174  // right != NULL
175  if (left == NULL) return right;
176  if (left == SUCCESSFULLY_ENQUEUED) return right;
177  // left contains a task
178  if (right != SUCCESSFULLY_ENQUEUED) {
179  // both are valid tasks
181  return right;
182  }
183  return left;
184 }
186 #if __TBB_PREVIEW_ASYNC_MSG
188 template < typename T > class async_msg;
190 namespace internal {
192 template < typename T > class async_storage;
194 template< typename T, typename = void >
197  typedef T filtered_type;
199  static const bool is_async_type = false;
201  static const void* to_void_ptr(const T& t) {
202  return static_cast<const void*>(&t);
203  }
205  static void* to_void_ptr(T& t) {
206  return static_cast<void*>(&t);
207  }
209  static const T& from_void_ptr(const void* p) {
210  return *static_cast<const T*>(p);
211  }
213  static T& from_void_ptr(void* p) {
214  return *static_cast<T*>(p);
215  }
217  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
218  if (is_async) {
219  // This (T) is NOT async and incoming 'A<X> t' IS async
220  // Get data from async_msg
222  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
223  // finalize() must be called after subscribe() because set() can be called in finalize()
224  // and 'this_recv' client must be subscribed by this moment
225  msg.finalize();
226  return new_task;
227  }
228  else {
229  // Incoming 't' is NOT async
230  return this_recv->try_put_task(from_void_ptr(p));
231  }
232  }
233 };
235 template< typename T >
236 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
237  typedef T async_type;
238  typedef typename T::async_msg_data_type filtered_type;
240  static const bool is_async_type = true;
242  // Receiver-classes use const interfaces
243  static const void* to_void_ptr(const T& t) {
244  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
245  }
247  static void* to_void_ptr(T& t) {
248  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
249  }
251  // Sender-classes use non-const interfaces
252  static const T& from_void_ptr(const void* p) {
253  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
254  }
256  static T& from_void_ptr(void* p) {
257  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
258  }
260  // Used in receiver<T> class
261  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
262  if (is_async) {
263  // Both are async
264  return this_recv->try_put_task(from_void_ptr(p));
265  }
266  else {
267  // This (T) is async and incoming 'X t' is NOT async
268  // Create async_msg for X
270  const T msg(t);
271  return this_recv->try_put_task(msg);
272  }
273  }
274 };
279  template< typename, typename > friend class internal::predecessor_cache;
280  template< typename, typename > friend class internal::reservable_predecessor_cache;
281 public:
285  virtual ~untyped_sender() {}
287  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
289  // TODO: Prevent untyped successor registration
292  virtual bool register_successor( successor_type &r ) = 0;
295  virtual bool remove_successor( successor_type &r ) = 0;
298  virtual bool try_release( ) { return false; }
301  virtual bool try_consume( ) { return false; }
303 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
304  typedef internal::edge_container<successor_type> built_successors_type;
306  typedef built_successors_type::edge_list_type successor_list_type;
307  virtual built_successors_type &built_successors() = 0;
308  virtual void internal_add_built_successor( successor_type & ) = 0;
309  virtual void internal_delete_built_successor( successor_type & ) = 0;
310  virtual void copy_successors( successor_list_type &) = 0;
311  virtual size_t successor_count() = 0;
312 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
313 protected:
315  template< typename X >
316  bool try_get( X &t ) {
318  }
321  template< typename X >
322  bool try_reserve( X &t ) {
324  }
326  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
327  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
328 };
330 class untyped_receiver {
331  template< typename, typename > friend class run_and_put_task;
332  template< typename > friend class limiter_node;
334  template< typename, typename > friend class internal::broadcast_cache;
335  template< typename, typename > friend class internal::round_robin_cache;
336  template< typename, typename > friend class internal::successor_cache;
338 #if __TBB_PREVIEW_OPENCL_NODE
339  template< typename, typename > friend class proxy_dependency_receiver;
340 #endif /* __TBB_PREVIEW_OPENCL_NODE */
341 public:
346  virtual ~untyped_receiver() {}
349  template<typename X>
350  bool try_put(const X& t) {
351  task *res = try_put_task(t);
352  if (!res) return false;
354  return true;
355  }
357  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
359  // TODO: Prevent untyped predecessor registration
360 
362  virtual bool register_predecessor( predecessor_type & ) { return false; }
365  virtual bool remove_predecessor( predecessor_type & ) { return false; }
367 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
368  typedef internal::edge_container<predecessor_type> built_predecessors_type;
369  typedef built_predecessors_type::edge_list_type predecessor_list_type;
370  virtual built_predecessors_type &built_predecessors() = 0;
371  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
372  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
373  virtual void copy_predecessors( predecessor_list_type & ) = 0;
374  virtual size_t predecessor_count() = 0;
375 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
376 protected:
377  template<typename X>
378  task *try_put_task(const X& t) {
380  }
382  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
384  virtual graph& graph_reference() = 0;
386  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
391  virtual bool is_continue_receiver() { return false; }
392 };
393 
394 } // namespace internal
397 template< typename T >
399 public:
401  typedef T output_type;
406  virtual bool try_get( T & ) { return false; }
409  virtual bool try_reserve( T & ) { return false; }
411 protected:
412  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
413  // Both async OR both are NOT async
416  }
417  // Else: this (T) is async OR incoming 't' is async
418  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
419  return false;
420  }
422  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
423  // Both async OR both are NOT async
426  }
427  // Else: this (T) is async OR incoming 't' is async
428  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
429  return false;
430  }
431 }; // class sender<T>
434 template< typename T >
436  template< typename > friend class internal::async_storage;
437  template< typename, typename > friend struct internal::async_helpers;
438 public:
440  typedef T input_type;
441 
447  }
451  }
453 protected:
454  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
456  }
459  virtual task *try_put_task(const T& t) = 0;
461 }; // class receiver<T>
463 #else // __TBB_PREVIEW_ASYNC_MSG
466 template< typename T >
467 class sender {
468 public:
470  typedef T output_type;
475  virtual ~sender() {}
477  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
480  virtual bool register_successor( successor_type &r ) = 0;
483  virtual bool remove_successor( successor_type &r ) = 0;
486  virtual bool try_get( T & ) { return false; }
487 
489  virtual bool try_reserve( T & ) { return false; }
490 
492  virtual bool try_release( ) { return false; }
495  virtual bool try_consume( ) { return false; }
496 
497 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
498  typedef typename internal::edge_container<successor_type> built_successors_type;
500  typedef typename built_successors_type::edge_list_type successor_list_type;
501  virtual built_successors_type &built_successors() = 0;
502  virtual void internal_add_built_successor( successor_type & ) = 0;
503  virtual void internal_delete_built_successor( successor_type & ) = 0;
504  virtual void copy_successors( successor_list_type &) = 0;
505  virtual size_t successor_count() = 0;
506 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
507 }; // class sender<T>
508 
510 template< typename T >
511 class receiver {
512 public:
514  typedef T input_type;
515 
517  typedef sender<T> predecessor_type;
520  virtual ~receiver() {}
523  bool try_put( const T& t ) {
524  task *res = try_put_task(t);
525  if (!res) return false;
527  return true;
528  }
531 protected:
532  template< typename R, typename B > friend class run_and_put_task;
533  template< typename X, typename Y > friend class internal::broadcast_cache;
534  template< typename X, typename Y > friend class internal::round_robin_cache;
535  virtual task *try_put_task(const T& t) = 0;
536  virtual graph& graph_reference() = 0;
537 public:
538  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
541  virtual bool register_predecessor( predecessor_type & ) { return false; }
544  virtual bool remove_predecessor( predecessor_type & ) { return false; }
546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
547  typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
548  typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
549  virtual built_predecessors_type &built_predecessors() = 0;
550  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
551  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
552  virtual void copy_predecessors( predecessor_list_type & ) = 0;
553  virtual size_t predecessor_count() = 0;
554 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
556 protected:
558  template<typename U> friend class limiter_node;
560 
561  template<typename TT, typename M> friend class internal::successor_cache;
562  virtual bool is_continue_receiver() { return false; }
564 #if __TBB_PREVIEW_OPENCL_NODE
565  template< typename, typename > friend class proxy_dependency_receiver;
566 #endif /* __TBB_PREVIEW_OPENCL_NODE */
567 }; // class receiver<T>
569 #endif // __TBB_PREVIEW_ASYNC_MSG
573 class continue_receiver : public receiver< continue_msg > {
574 public:
578 
584  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
587  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
588  }
593  my_current_count = 0;
594  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
595  }
601  return true;
602  }
605 
611  return true;
612  }
613 
614 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
615  typedef internal::edge_container<predecessor_type> built_predecessors_type;
616  typedef built_predecessors_type::edge_list_type predecessor_list_type;
617  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
618 
619  void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
621  my_built_predecessors.add_edge( s );
622  }
623 
624  void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
626  my_built_predecessors.delete_edge(s);
627  }
629  void copy_predecessors( predecessor_list_type &v) __TBB_override {
631  my_built_predecessors.copy_edges(v);
632  }
634  size_t predecessor_count() __TBB_override {
636  return my_built_predecessors.edge_count();
637  }
639 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
641 protected:
642  template< typename R, typename B > friend class run_and_put_task;
643  template<typename X, typename Y> friend class internal::broadcast_cache;
644  template<typename X, typename Y> friend class internal::round_robin_cache;
645  // execute body is supposed to be too small to create a task for.
647  {
651  else
653  }
654  task * res = execute();
655  return res? res : SUCCESSFULLY_ENQUEUED;
656  }
658 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
659  // continue_receiver must contain its own built_predecessors because it does
660  // not have a node_cache.
661  built_predecessors_type my_built_predecessors;
662 #endif
668  // the friend declaration in the base class did not eliminate the "protected class"
669  // error in gcc 4.1.2
670  template<typename U> friend class limiter_node;
674  if (f & rf_clear_edges) {
675 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
676  my_built_predecessors.clear();
677 #endif
679  }
680  }
683 
685  virtual task * execute() = 0;
686  template<typename TT, typename M> friend class internal::successor_cache;
687  bool is_continue_receiver() __TBB_override { return true; }
689 }; // class continue_receiver
691 } // interfaceX
693 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
694  template <typename K, typename T>
695  K key_from_message( const T &t ) {
696  return t.key();
697  }
698 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
699 
703 } // flow
704 } // tbb
709 namespace tbb {
710 namespace flow {
711 namespace interface10 {
712 
716 #if __TBB_PREVIEW_ASYNC_MSG
718 #endif
720 
721 template <typename C, typename N>
722 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
723 {
724  if (begin) current_node = my_graph->my_nodes;
725  //else it is an end iterator by default
726 }
727 
728 template <typename C, typename N>
730  __TBB_ASSERT(current_node, "graph_iterator at end");
731  return *operator->();
732 }
734 template <typename C, typename N>
736  return current_node;
737 }
739 template <typename C, typename N>
741  if (current_node) current_node = current_node->next;
742 }
745 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
747  own_context = true;
748  cancelled = false;
749  caught_exception = false;
750  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
754  my_is_active = true;
755 }
757 inline graph::graph(task_group_context& use_this_context) :
758  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
759  prepare_task_arena();
760  own_context = false;
761  my_root_task = (new (task::allocate_root(*my_context)) empty_task);
762  my_root_task->set_ref_count(1);
764  my_is_active = true;
765 }
766 
767 inline graph::~graph() {
768  wait_for_all();
770  tbb::task::destroy(*my_root_task);
771  if (own_context) delete my_context;
772  delete my_task_arena;
773 }
774 
775 inline void graph::reserve_wait() {
779  }
780 }
781 
782 inline void graph::release_wait() {
786  }
787 }
790  n->next = NULL;
791  {
794  if (my_nodes_last) my_nodes_last->next = n;
796  if (!my_nodes) my_nodes = n;
797  }
798 }
799 
801  {
803  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
804  if (n->prev) n->prev->next = n->next;
805  if (n->next) n->next->prev = n->prev;
806  if (my_nodes_last == n) my_nodes_last = n->prev;
807  if (my_nodes == n) my_nodes = n->next;
808  }
809  n->prev = n->next = NULL;
810 }
811 
812 inline void graph::reset( reset_flags f ) {
813  // reset context
815 
816  if(my_context) my_context->reset();
817  cancelled = false;
818  caught_exception = false;
819  // reset all the nodes comprising the graph
820  for(iterator ii = begin(); ii != end(); ++ii) {
821  graph_node *my_p = &(*ii);
822  my_p->reset_node(f);
823  }
824  // Reattach the arena. Might be useful to run the graph in a particular task_arena
825  // while not limiting graph lifetime to a single task_arena::execute() call.
826  prepare_task_arena( /*reinit=*/true );
828  // now spawn the tasks necessary to start the graph
829  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
830  internal::spawn_in_graph_arena(*this, *(*rti));
831  }
832  my_reset_task_list.clear();
833 }
835 inline graph::iterator graph::begin() { return iterator(this, true); }
836 
837 inline graph::iterator graph::end() { return iterator(this, false); }
838 
839 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
840 
841 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
842 
843 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
844 
845 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
847 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
848 inline void graph::set_name(const char *name) {
850 }
851 #endif
853 inline graph_node::graph_node(graph& g) : my_graph(g) {
855 }
856 
859 }
860 
862 
864 template < typename Output >
865 class source_node : public graph_node, public sender< Output > {
866 public:
868  typedef Output output_type;
869 
873  //Source node has no input type
875 
876 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
877  typedef typename sender<output_type>::built_successors_type built_successors_type;
878  typedef typename sender<output_type>::successor_list_type successor_list_type;
879 #endif
880 
882  template< typename Body >
883  source_node( graph &g, Body body, bool is_active = true )
884  : graph_node(g), my_active(is_active), init_my_active(is_active),
885  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
886  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
887  my_reserved(false), my_has_cached_item(false)
888  {
889  my_successors.set_owner(this);
890  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
891  static_cast<sender<output_type> *>(this), this->my_body );
892  }
895  source_node( const source_node& src ) :
896  graph_node(src.my_graph), sender<Output>(),
899  my_reserved(false), my_has_cached_item(false)
900  {
901  my_successors.set_owner(this);
902  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
903  static_cast<sender<output_type> *>(this), this->my_body );
904  }
905 
907  ~source_node() { delete my_body; delete my_init_body; }
908 
909 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
910  void set_name( const char *name ) __TBB_override {
912  }
913 #endif
919  if ( my_active )
920  spawn_put();
921  return true;
922  }
923 
928  return true;
929  }
931 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
932 
933  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
935  void internal_add_built_successor( successor_type &r) __TBB_override {
937  my_successors.internal_add_built_successor(r);
938  }
940  void internal_delete_built_successor( successor_type &r) __TBB_override {
942  my_successors.internal_delete_built_successor(r);
943  }
944 
945  size_t successor_count() __TBB_override {
947  return my_successors.successor_count();
948  }
949 
950  void copy_successors(successor_list_type &v) __TBB_override {
952  my_successors.copy_successors(v);
953  }
954 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
959  if ( my_reserved )
960  return false;
961 
962  if ( my_has_cached_item ) {
963  v = my_cached_item;
965  return true;
966  }
967  // we've been asked to provide an item, but we have none. enqueue a task to
968  // provide one.
969  spawn_put();
970  return false;
971  }
976  if ( my_reserved ) {
977  return false;
978  }
979 
980  if ( my_has_cached_item ) {
981  v = my_cached_item;
982  my_reserved = true;
983  return true;
984  } else {
985  return false;
986  }
987  }
988 
993  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
994  my_reserved = false;
995  if(!my_successors.empty())
997  return true;
998  }
999 
1003  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1004  my_reserved = false;
1005  my_has_cached_item = false;
1006  if ( !my_successors.empty() ) {
1007  spawn_put();
1008  }
1009  return true;
1010  }
1011 
1013  void activate() {
1015  my_active = true;
1016  if (!my_successors.empty())
1017  spawn_put();
1018  }
1019 
1020  template<typename Body>
1023  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1024  }
1025 
1026 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1027  void extract( ) __TBB_override {
1028  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1030  my_reserved = false;
1032  }
1033 #endif
1034 
1035 protected:
1036 
1040  my_reserved =false;
1041  if(my_has_cached_item) {
1042  my_has_cached_item = false;
1043  }
1047  delete my_body;
1048  my_body = tmp;
1049  }
1052  }
1053 
1054 private:
1064 
1065  // used by apply_body_bypass, can invoke body of node.
1068  if ( my_reserved ) {
1069  return false;
1070  }
1073  bool r = (*my_body)(my_cached_item);
1075  if (r) {
1077  }
1078  }
1080  v = my_cached_item;
1081  my_reserved = true;
1082  return true;
1083  } else {
1084  return false;
1085  }
1086  }
1087 
1088  // when resetting, and if the source_node was created with my_active == true, then
1089  // when we reset the node we must store a task to run the node, and spawn it only
1090  // after the reset is complete and is_active() is again true. This is why we don't
1091  // test for is_active() here.
1093  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1095  }
1098  void spawn_put( ) {
1099  if(internal::is_graph_active(this->my_graph)) {
1101  }
1102  }
1103 
1107  output_type v;
1108  if ( !try_reserve_apply_body(v) )
1109  return NULL;
1110 
1111  task *last_task = my_successors.try_put_task(v);
1112  if ( last_task )
1113  try_consume();
1114  else
1115  try_release();
1116  return last_task;
1117  }
1118 }; // class source_node
1119 
1121 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1122 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1123 public:
1124  typedef Input input_type;
1125  typedef Output output_type;
1131 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1132  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1133  typedef typename fOutput_type::successor_list_type successor_list_type;
1134 #endif
1136 
1138  // input_queue_type is allocated here, but destroyed in the function_input_base.
1139  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1140  // be done in one place. This would be an interface-breaking change.
1141  template< typename Body >
1143  graph &g, size_t concurrency,
1146  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1147  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1148  }
1149 
1152  graph_node(src.my_graph),
1153  input_impl_type(src),
1154  fOutput_type() {
1155  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1156  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1157  }
1158 
1159 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1160  void set_name( const char *name ) __TBB_override {
1162  }
1163 #endif
1164 
1165 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1166  void extract( ) __TBB_override {
1167  my_predecessors.built_predecessors().receiver_extract(*this);
1168  successors().built_successors().sender_extract(*this);
1169  }
1170 #endif
1171 
1172 protected:
1173  template< typename R, typename B > friend class run_and_put_task;
1174  template<typename X, typename Y> friend class internal::broadcast_cache;
1175  template<typename X, typename Y> friend class internal::round_robin_cache;
1177 
1179 
1182  // TODO: use clear() instead.
1183  if(f & rf_clear_edges) {
1184  successors().clear();
1186  }
1187  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1188  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1189  }
1190 
1191 }; // class function_node
1192 
1194 // Output is a tuple of output types.
1195 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1197  public graph_node,
1199  <
1200  Input,
1201  typename internal::wrap_tuple_elements<
1202  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1203  internal::multifunction_output, // wrap this around each element
1204  Output // the tuple providing the types
1205  >::type,
1206  Policy,
1207  Allocator
1208  > {
1209 protected:
1211 public:
1212  typedef Input input_type;
1217 private:
1220 public:
1221  template<typename Body>
1223  graph &g, size_t concurrency,
1225  ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1226  tbb::internal::fgt_multioutput_node_with_body<N>(
1227  tbb::internal::FLOW_MULTIFUNCTION_NODE,
1228  &this->my_graph, static_cast<receiver<input_type> *>(this),
1229  this->output_ports(), this->my_body
1230  );
1231  }
1232 
1234  graph_node(other.my_graph), base_type(other) {
1235  tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1236  &this->my_graph, static_cast<receiver<input_type> *>(this),
1237  this->output_ports(), this->my_body );
1238  }
1239 
1240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1241  void set_name( const char *name ) __TBB_override {
1243  }
1244 #endif
1245 
1246 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1247  void extract( ) __TBB_override {
1248  my_predecessors.built_predecessors().receiver_extract(*this);
1249  base_type::extract();
1250  }
1251 #endif
1252  // all the guts are in multifunction_input...
1253 protected:
1255 }; // multifunction_node
1256 
1258 // successors. The node has unlimited concurrency, so it does not reject inputs.
1259 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1260 class split_node : public graph_node, public receiver<TupleType> {
1263 public:
1264  typedef TupleType input_type;
1265  typedef Allocator allocator_type;
1266 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1268  typedef typename base_type::predecessor_list_type predecessor_list_type;
1270  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1271 #endif
1273  typedef typename internal::wrap_tuple_elements<
1274  N, // #elements in tuple
1275  internal::multifunction_output, // wrap this around each element
1276  TupleType // the tuple providing the types
1278 
1279  explicit split_node(graph &g) : graph_node(g)
1280  {
1281  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1282  static_cast<receiver<input_type> *>(this), this->output_ports());
1283  }
1284  split_node( const split_node & other) : graph_node(other.my_graph), base_type(other)
1285  {
1286  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1287  static_cast<receiver<input_type> *>(this), this->output_ports());
1288  }
1290 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1291  void set_name( const char *name ) __TBB_override {
1293  }
1294 #endif
1295 
1298 protected:
1299  task *try_put_task(const TupleType& t) __TBB_override {
1300  // Sending split messages in parallel is not justified, as overheads would prevail.
1301  // Also, we do not have successors here. So we just tell the task returned here is successful.
1303  }
1305  if (f & rf_clear_edges)
1307 
1309  }
1312  return my_graph;
1313  }
1314 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1315 private:
1316  void extract() __TBB_override {}
1317 
1319  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1320 
1322  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1323 
1324  size_t predecessor_count() __TBB_override { return 0; }
1325 
1326  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1327 
1328  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1329 
1331  built_predecessors_type my_predessors;
1332 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1333 
1334 private:
1336 };
1337 
1339 template <typename Output, typename Policy = internal::Policy<void> >
1340 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1341  public internal::function_output<Output> {
1342 public:
1344  typedef Output output_type;
1349 
1351  template <typename Body >
1353  graph &g,
1355  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ) {
1356  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1357  static_cast<receiver<input_type> *>(this),
1358  static_cast<sender<output_type> *>(this), this->my_body );
1359  }
1360 
1362  template <typename Body >
1364  graph &g, int number_of_predecessors,
1366  ) : graph_node(g)
1367  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1368  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1369  static_cast<receiver<input_type> *>(this),
1370  static_cast<sender<output_type> *>(this), this->my_body );
1371  }
1372 
1375  graph_node(src.my_graph), input_impl_type(src),
1376  internal::function_output<Output>() {
1377  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1378  static_cast<receiver<input_type> *>(this),
1379  static_cast<sender<output_type> *>(this), this->my_body );
1380  }
1381 
1382 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1383  void set_name( const char *name ) __TBB_override {
1385  }
1386 #endif
1387 
1388 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1389  void extract() __TBB_override {
1390  input_impl_type::my_built_predecessors.receiver_extract(*this);
1391  successors().built_successors().sender_extract(*this);
1392  }
1393 #endif
1394 
1395 protected:
1396  template< typename R, typename B > friend class run_and_put_task;
1397  template<typename X, typename Y> friend class internal::broadcast_cache;
1398  template<typename X, typename Y> friend class internal::round_robin_cache;
1401 
1404  if(f & rf_clear_edges)successors().clear();
1405  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1406  }
1407 }; // continue_node
1408 
1410 template <typename T>
1411 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1412 public:
1413  typedef T input_type;
1414  typedef T output_type;
1417 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1418  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1419  typedef typename sender<output_type>::successor_list_type successor_list_type;
1420 #endif
1421 private:
1423 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1424  internal::edge_container<predecessor_type> my_built_predecessors;
1425  spin_mutex pred_mutex; // serialize accesses on edge_container
1426 #endif
1427 public:
1428 
1429  explicit broadcast_node(graph& g) : graph_node(g) {
1430  my_successors.set_owner( this );
1431  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1432  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1433  }
1434 
1435  // Copy constructor
1437  graph_node(src.my_graph), receiver<T>(), sender<T>()
1438  {
1439  my_successors.set_owner( this );
1440  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1441  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1442  }
1443 
1444 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1445  void set_name( const char *name ) __TBB_override {
1447  }
1448 #endif
1449 
1453  return true;
1454  }
1455 
1459  return true;
1460  }
1461 
1462 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1463  typedef typename sender<T>::built_successors_type built_successors_type;
1464 
1465  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1466 
1467  void internal_add_built_successor(successor_type &r) __TBB_override {
1468  my_successors.internal_add_built_successor(r);
1469  }
1471  void internal_delete_built_successor(successor_type &r) __TBB_override {
1472  my_successors.internal_delete_built_successor(r);
1473  }
1474 
1475  size_t successor_count() __TBB_override {
1476  return my_successors.successor_count();
1477  }
1479  void copy_successors(successor_list_type &v) __TBB_override {
1480  my_successors.copy_successors(v);
1481  }
1482 
1483  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1484 
1485  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1486 
1487  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1488  spin_mutex::scoped_lock l(pred_mutex);
1489  my_built_predecessors.add_edge(p);
1490  }
1491 
1492  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1493  spin_mutex::scoped_lock l(pred_mutex);
1494  my_built_predecessors.delete_edge(p);
1495  }
1497  size_t predecessor_count() __TBB_override {
1498  spin_mutex::scoped_lock l(pred_mutex);
1499  return my_built_predecessors.edge_count();
1500  }
1502  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1503  spin_mutex::scoped_lock l(pred_mutex);
1504  my_built_predecessors.copy_edges(v);
1505  }
1506 
1507  void extract() __TBB_override {
1508  my_built_predecessors.receiver_extract(*this);
1509  my_successors.built_successors().sender_extract(*this);
1510  }
1511 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1513 protected:
1514  template< typename R, typename B > friend class run_and_put_task;
1515  template<typename X, typename Y> friend class internal::broadcast_cache;
1516  template<typename X, typename Y> friend class internal::round_robin_cache;
1519  task *new_task = my_successors.try_put_task(t);
1520  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1521  return new_task;
1522  }
1525  return my_graph;
1526  }
1527 
1529 
1532  my_successors.clear();
1533 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1534  my_built_predecessors.clear();
1535 #endif
1536  }
1537  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1538  }
1539 }; // broadcast_node
1540 
1542 template <typename T, typename A=cache_aligned_allocator<T> >
1543 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1544 public:
1545  typedef T input_type;
1546  typedef T output_type;
1550 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1551  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1552  typedef typename sender<output_type>::successor_list_type successor_list_type;
1553 #endif
1554 protected:
1555  typedef size_t size_type;
1557 
1558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1559  internal::edge_container<predecessor_type> my_built_predecessors;
1560 #endif
1561 
1565 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1566  , add_blt_succ, del_blt_succ,
1567  add_blt_pred, del_blt_pred,
1568  blt_succ_cnt, blt_pred_cnt,
1569  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1570 #endif
1571  };
1572 
1573  // implements the aggregator_operation concept
1574  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1575  public:
1576  char type;
1577 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1579  union {
1583  size_t cnt_val;
1584  successor_list_type *svec;
1585  predecessor_list_type *pvec;
1586  };
1587 #else
1588  T *elem;
1591 #endif
1592  buffer_operation(const T& e, op_type t) : type(char(t))
1593 
1594 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1595  , ltask(NULL), elem(const_cast<T*>(&e))
1596 #else
1597  , elem(const_cast<T*>(&e)) , ltask(NULL)
1598 #endif
1599  {}
1600  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1601  };
1602 
1604  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1605  friend class internal::aggregating_functor<class_type, buffer_operation>;
1606  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1608  virtual void handle_operations(buffer_operation *op_list) {
1609  handle_operations_impl(op_list, this);
1610  }
1612  template<typename derived_type>
1613  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1614  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1616  buffer_operation *tmp = NULL;
1617  bool try_forwarding = false;
1618  while (op_list) {
1619  tmp = op_list;
1620  op_list = op_list->next;
1621  switch (tmp->type) {
1622  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1623  case rem_succ: internal_rem_succ(tmp); break;
1624  case req_item: internal_pop(tmp); break;
1625  case res_item: internal_reserve(tmp); break;
1626  case rel_res: internal_release(tmp); try_forwarding = true; break;
1627  case con_res: internal_consume(tmp); try_forwarding = true; break;
1628  case put_item: try_forwarding = internal_push(tmp); break;
1630 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1631  // edge recording
1632  case add_blt_succ: internal_add_built_succ(tmp); break;
1633  case del_blt_succ: internal_del_built_succ(tmp); break;
1634  case add_blt_pred: internal_add_built_pred(tmp); break;
1635  case del_blt_pred: internal_del_built_pred(tmp); break;
1636  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1637  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1638  case blt_succ_cpy: internal_copy_succs(tmp); break;
1639  case blt_pred_cpy: internal_copy_preds(tmp); break;
1640 #endif
1641  }
1642  }
1643 
1644  derived->order();
1645 
1646  if (try_forwarding && !forwarder_busy) {
1647  if(internal::is_graph_active(this->my_graph)) {
1648  forwarder_busy = true;
1649  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1651  < buffer_node<input_type, A> >(*this);
1652  // tmp should point to the last item handled by the aggregator. This is the operation
1653  // the handling thread enqueued. So modifying that record will be okay.
1654  // workaround for icc bug
1655  tbb::task *z = tmp->ltask;
1656  graph &g = this->my_graph;
1657  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1658  }
1659  }
1660  } // handle_operations
1663  return op_data.ltask;
1664  }
1666  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1667  task *ft = grab_forwarding_task(op_data);
1668  if(ft) {
1670  return true;
1671  }
1672  return false;
1673  }
1674 
1676  virtual task *forward_task() {
1677  buffer_operation op_data(try_fwd_task);
1678  task *last_task = NULL;
1679  do {
1680  op_data.status = internal::WAIT;
1681  op_data.ltask = NULL;
1682  my_aggregator.execute(&op_data);
1684  // workaround for icc bug
1685  tbb::task *xtask = op_data.ltask;
1686  graph& g = this->my_graph;
1687  last_task = combine_tasks(g, last_task, xtask);
1688  } while (op_data.status ==internal::SUCCEEDED);
1689  return last_task;
1690  }
1691 
1693  virtual void internal_reg_succ(buffer_operation *op) {
1694  my_successors.register_successor(*(op->r));
1696  }
1697 
1700  my_successors.remove_successor(*(op->r));
1702  }
1704 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1705  typedef typename sender<T>::built_successors_type built_successors_type;
1706 
1707  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1709  virtual void internal_add_built_succ(buffer_operation *op) {
1710  my_successors.internal_add_built_successor(*(op->r));
1712  }
1714  virtual void internal_del_built_succ(buffer_operation *op) {
1715  my_successors.internal_delete_built_successor(*(op->r));
1717  }
1718 
1719  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1721  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1722 
1723  virtual void internal_add_built_pred(buffer_operation *op) {
1724  my_built_predecessors.add_edge(*(op->p));
1726  }
1727 
1728  virtual void internal_del_built_pred(buffer_operation *op) {
1729  my_built_predecessors.delete_edge(*(op->p));
1731  }
1733  virtual void internal_succ_cnt(buffer_operation *op) {
1734  op->cnt_val = my_successors.successor_count();
1736  }
1737 
1738  virtual void internal_pred_cnt(buffer_operation *op) {
1739  op->cnt_val = my_built_predecessors.edge_count();
1741  }
1742 
1743  virtual void internal_copy_succs(buffer_operation *op) {
1744  my_successors.copy_successors(*(op->svec));
1746  }
1747 
1748  virtual void internal_copy_preds(buffer_operation *op) {
1749  my_built_predecessors.copy_edges(*(op->pvec));
1751  }
1753 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1755 private:
1756  void order() {}
1758  bool is_item_valid() {
1759  return this->my_item_valid(this->my_tail - 1);
1760  }
1761 
1762  void try_put_and_add_task(task*& last_task) {
1763  task *new_task = my_successors.try_put_task(this->back());
1764  if (new_task) {
1765  // workaround for icc bug
1766  graph& g = this->my_graph;
1767  last_task = combine_tasks(g, last_task, new_task);
1768  this->destroy_back();
1769  }
1770  }
1771 
1772 protected:
1775  internal_forward_task_impl(op, this);
1776  }
1777 
1778  template<typename derived_type>
1779  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1780  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1781 
1782  if (this->my_reserved || !derived->is_item_valid()) {
1784  this->forwarder_busy = false;
1785  return;
1786  }
1787  // Try forwarding, giving each successor a chance
1788  task * last_task = NULL;
1789  size_type counter = my_successors.size();
1790  for (; counter > 0 && derived->is_item_valid(); --counter)
1791  derived->try_put_and_add_task(last_task);
1792 
1793  op->ltask = last_task; // return task
1794  if (last_task && !counter) {
1796  }
1797  else {
1800  }
1801  }
1802 
1803  virtual bool internal_push(buffer_operation *op) {
1804  this->push_back(*(op->elem));
1806  return true;
1807  }
1809  virtual void internal_pop(buffer_operation *op) {
1810  if(this->pop_back(*(op->elem))) {
1812  }
1813  else {
1815  }
1816  }
1819  if(this->reserve_front(*(op->elem))) {
1821  }
1822  else {
1824  }
1825  }
1828  this->consume_front();
1830  }
1833  this->release_front();
1835  }
1836 
1837 public:
1839  explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1840  forwarder_busy(false) {
1841  my_successors.set_owner(this);
1842  my_aggregator.initialize_handler(handler_type(this));
1843  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1844  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1845  }
1849  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1850  forwarder_busy = false;
1851  my_successors.set_owner(this);
1852  my_aggregator.initialize_handler(handler_type(this));
1853  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1854  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1855  }
1856 
1857 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1858  void set_name( const char *name ) __TBB_override {
1860  }
1861 #endif
1862 
1863  //
1864  // message sender implementation
1865  //
1866 
1870  buffer_operation op_data(reg_succ);
1871  op_data.r = &r;
1872  my_aggregator.execute(&op_data);
1874  return true;
1875  }
1876 
1877 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1878  void internal_add_built_successor( successor_type &r) __TBB_override {
1879  buffer_operation op_data(add_blt_succ);
1880  op_data.r = &r;
1881  my_aggregator.execute(&op_data);
1882  }
1884  void internal_delete_built_successor( successor_type &r) __TBB_override {
1885  buffer_operation op_data(del_blt_succ);
1886  op_data.r = &r;
1887  my_aggregator.execute(&op_data);
1888  }
1890  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1891  buffer_operation op_data(add_blt_pred);
1892  op_data.p = &p;
1893  my_aggregator.execute(&op_data);
1894  }
1895 
1896  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1897  buffer_operation op_data(del_blt_pred);
1898  op_data.p = &p;
1899  my_aggregator.execute(&op_data);
1900  }
1901 
1902  size_t predecessor_count() __TBB_override {
1903  buffer_operation op_data(blt_pred_cnt);
1904  my_aggregator.execute(&op_data);
1905  return op_data.cnt_val;
1906  }
1907 
1908  size_t successor_count() __TBB_override {
1909  buffer_operation op_data(blt_succ_cnt);
1910  my_aggregator.execute(&op_data);
1911  return op_data.cnt_val;
1912  }
1913 
1914  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
1915  buffer_operation op_data(blt_pred_cpy);
1916  op_data.pvec = &v;
1917  my_aggregator.execute(&op_data);
1918  }
1919 
1920  void copy_successors( successor_list_type &v ) __TBB_override {
1921  buffer_operation op_data(blt_succ_cpy);
1922  op_data.svec = &v;
1923  my_aggregator.execute(&op_data);
1924  }
1925 
1926 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1927 
1932  r.remove_predecessor(*this);
1933  buffer_operation op_data(rem_succ);
1934  op_data.r = &r;
1935  my_aggregator.execute(&op_data);
1936  // even though this operation does not cause a forward, if we are the handler, and
1937  // a forward is scheduled, we may be the first to reach this point after the aggregator,
1938  // and so should check for the task.
1940  return true;
1941  }
1944 
1946  bool try_get( T &v ) __TBB_override {
1947  buffer_operation op_data(req_item);
1948  op_data.elem = &v;
1949  my_aggregator.execute(&op_data);
1951  return (op_data.status==internal::SUCCEEDED);
1952  }
1958  buffer_operation op_data(res_item);
1959  op_data.elem = &v;
1960  my_aggregator.execute(&op_data);
1961  (void)enqueue_forwarding_task(op_data);
1962  return (op_data.status==internal::SUCCEEDED);
1963  }
1964 
1966 
1968  buffer_operation op_data(rel_res);
1969  my_aggregator.execute(&op_data);
1970  (void)enqueue_forwarding_task(op_data);
1971  return true;
1972  }
1973 
1975 
1977  buffer_operation op_data(con_res);
1978  my_aggregator.execute(&op_data);
1979  (void)enqueue_forwarding_task(op_data);
1980  return true;
1981  }
1982 
1983 protected:
1984 
1985  template< typename R, typename B > friend class run_and_put_task;
1986  template<typename X, typename Y> friend class internal::broadcast_cache;
1987  template<typename X, typename Y> friend class internal::round_robin_cache;
1990  buffer_operation op_data(t, put_item);
1991  my_aggregator.execute(&op_data);
1992  task *ft = grab_forwarding_task(op_data);
1993  // sequencer_nodes can return failure (if an item has been previously inserted)
1994  // We have to spawn the returned task if our own operation fails.
1995 
1996  if(ft && op_data.status ==internal::FAILED) {
1997  // we haven't succeeded queueing the item, but for some reason the
1998  // call returned a task (if another request resulted in a successful
1999  // forward this could happen.) Queue the task and reset the pointer.
2001  }
2002  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2003  ft = SUCCESSFULLY_ENQUEUED;
2004  }
2005  return ft;
2006  }
2007 
2009  return my_graph;
2010  }
2011 
2013 
2014 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2015 public:
2016  void extract() __TBB_override {
2017  my_built_predecessors.receiver_extract(*this);
2018  my_successors.built_successors().sender_extract(*this);
2019  }
2020 #endif
2021 
2022 protected:
2025  // TODO: just clear structures
2026  if (f&rf_clear_edges) {
2027  my_successors.clear();
2028 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2029  my_built_predecessors.clear();
2030 #endif
2031  }
2032  forwarder_busy = false;
2033  }
2034 }; // buffer_node
2035 
2037 template <typename T, typename A=cache_aligned_allocator<T> >
2038 class queue_node : public buffer_node<T, A> {
2039 protected:
2044 
2045 private:
2046  template<typename, typename> friend class buffer_node;
2047 
2048  bool is_item_valid() {
2049  return this->my_item_valid(this->my_head);
2050  }
2051 
2052  void try_put_and_add_task(task*& last_task) {
2053  task *new_task = this->my_successors.try_put_task(this->front());
2054  if (new_task) {
2055  // workaround for icc bug
2056  graph& graph_ref = this->graph_reference();
2057  last_task = combine_tasks(graph_ref, last_task, new_task);
2058  this->destroy_front();
2059  }
2060  }
2061 
2062 protected:
2063  void internal_forward_task(queue_operation *op) __TBB_override {
2064  this->internal_forward_task_impl(op, this);
2065  }
2066 
2068  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2070  }
2071  else {
2072  this->pop_front(*(op->elem));
2074  }
2075  }
2077  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2079  }
2080  else {
2081  this->reserve_front(*(op->elem));
2083  }
2084  }
2086  this->consume_front();
2088  }
2089 
2090 public:
2091  typedef T input_type;
2092  typedef T output_type;
2095 
2097  explicit queue_node( graph &g ) : base_type(g) {
2098  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2099  static_cast<receiver<input_type> *>(this),
2100  static_cast<sender<output_type> *>(this) );
2101  }
2102 
2104  queue_node( const queue_node& src) : base_type(src) {
2105  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2106  static_cast<receiver<input_type> *>(this),
2107  static_cast<sender<output_type> *>(this) );
2108  }
2109 
2110 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2111  void set_name( const char *name ) __TBB_override {
2113  }
2114 #endif
2115 
2116 protected:
2119  }
2120 }; // queue_node
2121 
2123 template< typename T, typename A=cache_aligned_allocator<T> >
2124 class sequencer_node : public queue_node<T, A> {
2126  // my_sequencer should be a benign function and must be callable
2127  // from a parallel context. Does this mean it needn't be reset?
2128 public:
2129  typedef T input_type;
2130  typedef T output_type;
2133 
2135  template< typename Sequencer >
2136  sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2137  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2138  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2139  static_cast<receiver<input_type> *>(this),
2140  static_cast<sender<output_type> *>(this) );
2141  }
2142 
2144  sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2145  my_sequencer( src.my_sequencer->clone() ) {
2146  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2147  static_cast<receiver<input_type> *>(this),
2148  static_cast<sender<output_type> *>(this) );
2149  }
2150 
2153 
2154 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2155  void set_name( const char *name ) __TBB_override {
2157  }
2158 #endif
2159 
2160 protected:
2163 
2164 private:
2166  size_type tag = (*my_sequencer)(*(op->elem));
2167 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2168  if (tag < this->my_head) {
2169  // have already emitted a message with this tag
2171  return false;
2172  }
2173 #endif
2174  // cannot modify this->my_tail now; the buffer would be inconsistent.
2175  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2176 
2177  if (this->size(new_tail) > this->capacity()) {
2178  this->grow_my_array(this->size(new_tail));
2179  }
2180  this->my_tail = new_tail;
2181 
2182  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2183  __TBB_store_with_release(op->status, res);
2184  return res ==internal::SUCCEEDED;
2185  }
2186 }; // sequencer_node
2187 
2189 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2190 class priority_queue_node : public buffer_node<T, A> {
2191 public:
2192  typedef T input_type;
2193  typedef T output_type;
2198 
2200  explicit priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2201  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2202  static_cast<receiver<input_type> *>(this),
2203  static_cast<sender<output_type> *>(this) );
2204  }
2205 
2207  priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2208  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2209  static_cast<receiver<input_type> *>(this),
2210  static_cast<sender<output_type> *>(this) );
2211  }
2212 
2213 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2214  void set_name( const char *name ) __TBB_override {
2216  }
2217 #endif
2218 
2219 protected:
2220 
2222  mark = 0;
2224  }
2225 
2229 
2232  this->internal_forward_task_impl(op, this);
2233  }
2234 
2236  this->handle_operations_impl(op_list, this);
2237  }
2238 
2240  prio_push(*(op->elem));
2242  return true;
2243  }
2244 
2246  // if empty or already reserved, don't pop
2247  if ( this->my_reserved == true || this->my_tail == 0 ) {
2249  return;
2250  }
2251 
2252  *(op->elem) = prio();
2254  prio_pop();
2255 
2256  }
2257 
2258  // pops the highest-priority item, saves copy
2260  if (this->my_reserved == true || this->my_tail == 0) {
2262  return;
2263  }
2264  this->my_reserved = true;
2265  *(op->elem) = prio();
2266  reserved_item = *(op->elem);
2268  prio_pop();
2269  }
2270 
2273  this->my_reserved = false;
2275  }
2276 
2280  this->my_reserved = false;
2282  }
2283 
2284 private:
2285  template<typename, typename> friend class buffer_node;
2286 
2287  void order() {
2288  if (mark < this->my_tail) heapify();
2289  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2290  }
2291 
2292  bool is_item_valid() {
2293  return this->my_tail > 0;
2294  }
2295 
2296  void try_put_and_add_task(task*& last_task) {
2297  task * new_task = this->my_successors.try_put_task(this->prio());
2298  if (new_task) {
2299  // workaround for icc bug
2300  graph& graph_ref = this->graph_reference();
2301  last_task = combine_tasks(graph_ref, last_task, new_task);
2302  prio_pop();
2303  }
2304  }
2305 
2306 private:
2307  Compare compare;
2309 
2311 
2312  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2313  bool prio_use_tail() {
2314  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2315  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2316  }
2317 
2318  // prio_push: checks that the item will fit, expand array if necessary, put at end
2319  void prio_push(const T &src) {
2320  if ( this->my_tail >= this->my_array_size )
2321  this->grow_my_array( this->my_tail + 1 );
2322  (void) this->place_item(this->my_tail, src);
2323  ++(this->my_tail);
2324  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2325  }
2326 
2327  // prio_pop: deletes highest priority item from the array, and if it is item
2328  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2329  // and mark. Assumes the array has already been tested for emptiness; no failure.
2330  void prio_pop() {
2331  if (prio_use_tail()) {
2332  // there are newly pushed elements; last one higher than top
2333  // copy the data
2334  this->destroy_item(this->my_tail-1);
2335  --(this->my_tail);
2336  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2337  return;
2338  }
2339  this->destroy_item(0);
2340  if(this->my_tail > 1) {
2341  // push the last element down heap
2342  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2343  this->move_item(0,this->my_tail - 1);
2344  }
2345  --(this->my_tail);
2346  if(mark > this->my_tail) --mark;
2347  if (this->my_tail > 1) // don't reheap for heap of size 1
2348  reheap();
2349  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2350  }
2351 
2352  const T& prio() {
2353  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2354  }
2355 
2356  // turn array into heap
2357  void heapify() {
2358  if(this->my_tail == 0) {
2359  mark = 0;
2360  return;
2361  }
2362  if (!mark) mark = 1;
2363  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2364  size_type cur_pos = mark;
2365  input_type to_place;
2366  this->fetch_item(mark,to_place);
2367  do { // push to_place up the heap
2368  size_type parent = (cur_pos-1)>>1;
2369  if (!compare(this->get_my_item(parent), to_place))
2370  break;
2371  this->move_item(cur_pos, parent);
2372  cur_pos = parent;
2373  } while( cur_pos );
2374  (void) this->place_item(cur_pos, to_place);
2375  }
2376  }
2377 
2378  // otherwise heapified array with new root element; rearrange to heap
2379  void reheap() {
2380  size_type cur_pos=0, child=1;
2381  while (child < mark) {
2382  size_type target = child;
2383  if (child+1<mark &&
2384  compare(this->get_my_item(child),
2385  this->get_my_item(child+1)))
2386  ++target;
2387  // target now has the higher priority child
2388  if (compare(this->get_my_item(target),
2389  this->get_my_item(cur_pos)))
2390  break;
2391  // swap
2392  this->swap_items(cur_pos, target);
2393  cur_pos = target;
2394  child = (cur_pos<<1)+1;
2395  }
2396  }
2397 }; // priority_queue_node
2398 
2400 
2403 template< typename T >
2404 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2405 public:
2406  typedef T input_type;
2407  typedef T output_type;
2410 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2411  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2412  typedef typename sender<output_type>::built_successors_type built_successors_type;
2413  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2414  typedef typename sender<output_type>::successor_list_type successor_list_type;
2415 #endif
2416  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2417 
2418 private:
2420  size_t my_count; //number of successful puts
2421  size_t my_tries; //number of active put attempts
2426 
2428 
2429  // Let decrementer call decrement_counter()
2431 
2432  bool check_conditions() { // always called under lock
2433  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2434  }
2435 
2436  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2438  input_type v;
2439  task *rval = NULL;
2440  bool reserved = false;
2441  {
2443  if ( check_conditions() )
2444  ++my_tries;
2445  else
2446  return NULL;
2447  }
2448 
2449  //SUCCESS
2450  // if we can reserve and can put, we consume the reservation
2451  // we increment the count and decrement the tries
2452  if ( (my_predecessors.try_reserve(v)) == true ){
2453  reserved=true;
2454  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2455  {
2457  ++my_count;
2458  --my_tries;
2459  my_predecessors.try_consume();
2460  if ( check_conditions() ) {
2461  if ( internal::is_graph_active(this->my_graph) ) {
2462  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2465  }
2466  }
2467  }
2468  return rval;
2469  }
2470  }
2471  //FAILURE
2472  //if we can't reserve, we decrement the tries
2473  //if we can reserve but can't put, we decrement the tries and release the reservation
2474  {
2476  --my_tries;
2477  if (reserved) my_predecessors.try_release();
2478  if ( check_conditions() ) {
2479  if ( internal::is_graph_active(this->my_graph) ) {
2480  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2482  __TBB_ASSERT(!rval, "Have two tasks to handle");
2483  return rtask;
2484  }
2485  }
2486  return rval;
2487  }
2488  }
2489 
2490  void forward() {
2491  __TBB_ASSERT(false, "Should never be called");
2492  return;
2493  }
2494 
2496  {
2498  if(my_count) --my_count;
2499  }
2500  return forward_task();
2501  }
2502 
2503 public:
2506 
2508  limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
2509  graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2510  init_decrement_predecessors(num_decrement_predecessors),
2511  decrement(num_decrement_predecessors)
2512  {
2513  my_predecessors.set_owner(this);
2514  my_successors.set_owner(this);
2515  decrement.set_owner(this);
2516  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2517  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2518  static_cast<sender<output_type> *>(this) );
2519  }
2520 
2522  limiter_node( const limiter_node& src ) :
2523  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2527  {
2528  my_predecessors.set_owner(this);
2529  my_successors.set_owner(this);
2530  decrement.set_owner(this);
2531  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2532  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2533  static_cast<sender<output_type> *>(this) );
2534  }
2535 
2536 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2537  void set_name( const char *name ) __TBB_override {
2539  }
2540 #endif
2541 
2545  bool was_empty = my_successors.empty();
2547  //spawn a forward task if this is the only successor
2548  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2549  if ( internal::is_graph_active(this->my_graph) ) {
2550  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2553  }
2554  }
2555  return true;
2556  }
2557 
2559 
2561  r.remove_predecessor(*this);
2563  return true;
2564  }
2565 
2566 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2567  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2568  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2569 
2570  void internal_add_built_successor(successor_type &src) __TBB_override {
2571  my_successors.internal_add_built_successor(src);
2572  }
2573 
2574  void internal_delete_built_successor(successor_type &src) __TBB_override {
2575  my_successors.internal_delete_built_successor(src);
2576  }
2577 
2578  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2579 
2580  void copy_successors(successor_list_type &v) __TBB_override {
2581  my_successors.copy_successors(v);
2582  }
2583 
2584  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2585  my_predecessors.internal_add_built_predecessor(src);
2586  }
2587 
2588  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2589  my_predecessors.internal_delete_built_predecessor(src);
2590  }
2591 
2592  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2593 
2594  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2595  my_predecessors.copy_predecessors(v);
2596  }
2597 
2598  void extract() __TBB_override {
2599  my_count = 0;
2600  my_successors.built_successors().sender_extract(*this);
2601  my_predecessors.built_predecessors().receiver_extract(*this);
2602  decrement.built_predecessors().receiver_extract(decrement);
2603  }
2604 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2605 
2609  my_predecessors.add( src );
2611  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2614  }
2615  return true;
2616  }
2617 
2620  my_predecessors.remove( src );
2621  return true;
2622  }
2623 
2624 protected:
2625 
2626  template< typename R, typename B > friend class run_and_put_task;
2627  template<typename X, typename Y> friend class internal::broadcast_cache;
2628  template<typename X, typename Y> friend class internal::round_robin_cache;
2631  {
2633  if ( my_count + my_tries >= my_threshold )
2634  return NULL;
2635  else
2636  ++my_tries;
2637  }
2638 
2639  task * rtask = my_successors.try_put_task(t);
2640 
2641  if ( !rtask ) { // try_put_task failed.
2643  --my_tries;
2645  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2647  }
2648  }
2649  else {
2651  ++my_count;
2652  --my_tries;
2653  }
2654  return rtask;
2655  }
2656 
2658  return my_graph;
2659  }
2660 
2662  __TBB_ASSERT(false,NULL); // should never be called
2663  }
2664 
2666  my_count = 0;
2667  if(f & rf_clear_edges) {
2668  my_predecessors.clear();
2669  my_successors.clear();
2670  }
2671  else
2672  {
2673  my_predecessors.reset( );
2674  }
2675  decrement.reset_receiver(f);
2676  }
2677 }; // limiter_node
2678 
2680 
2684 using internal::input_port;
2685 using internal::tag_value;
2686 
2687 template<typename OutputTuple, typename JP=queueing> class join_node;
2688 
2689 template<typename OutputTuple>
2690 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2691 private:
2694 public:
2695  typedef OutputTuple output_type;
2697  explicit join_node(graph &g) : unfolded_type(g) {
2698  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2699  this->input_ports(), static_cast< sender< output_type > *>(this) );
2700  }
2701  join_node(const join_node &other) : unfolded_type(other) {
2702  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2703  this->input_ports(), static_cast< sender< output_type > *>(this) );
2704  }
2705 
2706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2707  void set_name( const char *name ) __TBB_override {
2709  }
2710 #endif
2711 
2712 };
2713 
2714 template<typename OutputTuple>
2715 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2716 private:
2719 public:
2720  typedef OutputTuple output_type;
2722  explicit join_node(graph &g) : unfolded_type(g) {
2723  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2724  this->input_ports(), static_cast< sender< output_type > *>(this) );
2725  }
2726  join_node(const join_node &other) : unfolded_type(other) {
2727  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2728  this->input_ports(), static_cast< sender< output_type > *>(this) );
2729  }
2730 
2731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2732  void set_name( const char *name ) __TBB_override {
2734  }
2735 #endif
2736 
2737 };
2738 
2739 // template for key_matching join_node
2740 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2741 template<typename OutputTuple, typename K, typename KHash>
2742 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2743  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2744 private:
2747 public:
2748  typedef OutputTuple output_type;
2750 
2751 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2753 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2754 
2755  template<typename __TBB_B0, typename __TBB_B1>
2756  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2757  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2758  this->input_ports(), static_cast< sender< output_type > *>(this) );
2759  }
2760  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2761  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2762  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2763  this->input_ports(), static_cast< sender< output_type > *>(this) );
2764  }
2765  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2766  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2767  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2768  this->input_ports(), static_cast< sender< output_type > *>(this) );
2769  }
2770  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2771  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2772  unfolded_type(g, b0, b1, b2, b3, b4) {
2773  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2774  this->input_ports(), static_cast< sender< output_type > *>(this) );
2775  }
2776 #if __TBB_VARIADIC_MAX >= 6
2777  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2778  typename __TBB_B5>
2779  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2780  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2781  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2782  this->input_ports(), static_cast< sender< output_type > *>(this) );
2783  }
2784 #endif
2785 #if __TBB_VARIADIC_MAX >= 7
2786  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2787  typename __TBB_B5, typename __TBB_B6>
2788  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2789  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2790  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2791  this->input_ports(), static_cast< sender< output_type > *>(this) );
2792  }
2793 #endif
2794 #if __TBB_VARIADIC_MAX >= 8
2795  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2796  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2797  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2798  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2799  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2800  this->input_ports(), static_cast< sender< output_type > *>(this) );
2801  }
2802 #endif
2803 #if __TBB_VARIADIC_MAX >= 9
2804  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2805  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2806  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2807  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2808  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2809  this->input_ports(), static_cast< sender< output_type > *>(this) );
2810  }
2811 #endif
2812 #if __TBB_VARIADIC_MAX >= 10
2813  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2814  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2815  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2816  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2817  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2818  this->input_ports(), static_cast< sender< output_type > *>(this) );
2819  }
2820 #endif
2821  join_node(const join_node &other) : unfolded_type(other) {
2822  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2823  this->input_ports(), static_cast< sender< output_type > *>(this) );
2824  }
2825 
2826 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2827  void set_name( const char *name ) __TBB_override {
2829  }
2830 #endif
2831 
2832 };
2833 
2834 // indexer node
2836 
2837 // TODO: Implement interface with variadic template or tuple
2838 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2839  typename T4=null_type, typename T5=null_type, typename T6=null_type,
2840  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2841 
2842 //indexer node specializations
2843 template<typename T0>
2844 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2845 private:
2846  static const int N = 1;
2847 public:
2848  typedef tuple<T0> InputTuple;
2852  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2853  this->input_ports(), static_cast< sender< output_type > *>(this) );
2854  }
2855  // Copy constructor
2856  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2857  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2858  this->input_ports(), static_cast< sender< output_type > *>(this) );
2859  }
2860 
2861 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2862  void set_name( const char *name ) __TBB_override {
2864  }
2865 #endif
2866 };
2867 
2868 template<typename T0, typename T1>
2869 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2870 private:
2871  static const int N = 2;
2872 public:
2873  typedef tuple<T0, T1> InputTuple;
2877  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2878  this->input_ports(), static_cast< sender< output_type > *>(this) );
2879  }
2880  // Copy constructor
2881  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2882  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2883  this->input_ports(), static_cast< sender< output_type > *>(this) );
2884  }
2885 
2886 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2887  void set_name( const char *name ) __TBB_override {
2889  }
2890 #endif
2891 };
2892 
2893 template<typename T0, typename T1, typename T2>
2894 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2895 private:
2896  static const int N = 3;
2897 public:
2898  typedef tuple<T0, T1, T2> InputTuple;
2902  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2903  this->input_ports(), static_cast< sender< output_type > *>(this) );
2904  }
2905  // Copy constructor
2906  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2907  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2908  this->input_ports(), static_cast< sender< output_type > *>(this) );
2909  }
2910 
2911 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2912  void set_name( const char *name ) __TBB_override {
2914  }
2915 #endif
2916 };
2917 
2918 template<typename T0, typename T1, typename T2, typename T3>
2919 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2920 private:
2921  static const int N = 4;
2922 public:
2923  typedef tuple<T0, T1, T2, T3> InputTuple;
2927  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2928  this->input_ports(), static_cast< sender< output_type > *>(this) );
2929  }
2930  // Copy constructor
2931  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2932  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2933  this->input_ports(), static_cast< sender< output_type > *>(this) );
2934  }
2935 
2936 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2937  void set_name( const char *name ) __TBB_override {
2939  }
2940 #endif
2941 };
2942 
2943 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2944 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2945 private:
2946  static const int N = 5;
2947 public:
2948  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2952  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2953  this->input_ports(), static_cast< sender< output_type > *>(this) );
2954  }
2955  // Copy constructor
2956  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2957  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2958  this->input_ports(), static_cast< sender< output_type > *>(this) );
2959  }
2960 
2961 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2962  void set_name( const char *name ) __TBB_override {
2964  }
2965 #endif
2966 };
2967 
2968 #if __TBB_VARIADIC_MAX >= 6
2969 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2970 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2971 private:
2972  static const int N = 6;
2973 public:
2974  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2978  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2979  this->input_ports(), static_cast< sender< output_type > *>(this) );
2980  }
2981  // Copy constructor
2982  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2983  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2984  this->input_ports(), static_cast< sender< output_type > *>(this) );
2985  }
2986 
2987 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2988  void set_name( const char *name ) __TBB_override {
2990  }
2991 #endif
2992 };
2993 #endif //variadic max 6
2994 
2995 #if __TBB_VARIADIC_MAX >= 7
2996 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2997  typename T6>
2998 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
2999 private:
3000  static const int N = 7;
3001 public:
3002  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3006  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3007  this->input_ports(), static_cast< sender< output_type > *>(this) );
3008  }
3009  // Copy constructor
3010  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3011  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3012  this->input_ports(), static_cast< sender< output_type > *>(this) );
3013  }
3014 
3015 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3016  void set_name( const char *name ) __TBB_override {
3018  }
3019 #endif
3020 };
3021 #endif //variadic max 7
3022 
3023 #if __TBB_VARIADIC_MAX >= 8
3024 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3025  typename T6, typename T7>
3026 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3027 private:
3028  static const int N = 8;
3029 public:
3030  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3034  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3035  this->input_ports(), static_cast< sender< output_type > *>(this) );
3036  }
3037  // Copy constructor
3038  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3039  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3040  this->input_ports(), static_cast< sender< output_type > *>(this) );
3041  }
3042 
3043 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3044  void set_name( const char *name ) __TBB_override {
3046  }
3047 #endif
3048 };
3049 #endif //variadic max 8
3050 
3051 #if __TBB_VARIADIC_MAX >= 9
3052 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3053  typename T6, typename T7, typename T8>
3054 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3055 private:
3056  static const int N = 9;
3057 public:
3058  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3062  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3063  this->input_ports(), static_cast< sender< output_type > *>(this) );
3064  }
3065  // Copy constructor
3066  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3067  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3068  this->input_ports(), static_cast< sender< output_type > *>(this) );
3069  }
3070 
3071 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3072  void set_name( const char *name ) __TBB_override {
3074  }
3075 #endif
3076 };
3077 #endif //variadic max 9
3078 
3079 #if __TBB_VARIADIC_MAX >= 10
3080 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3081  typename T6, typename T7, typename T8, typename T9>
3082 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3083 private:
3084  static const int N = 10;
3085 public:
3086  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3090  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3091  this->input_ports(), static_cast< sender< output_type > *>(this) );
3092  }
3093  // Copy constructor
3094  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3095  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3096  this->input_ports(), static_cast< sender< output_type > *>(this) );
3097  }
3098 
3099 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3100  void set_name( const char *name ) __TBB_override {
3102  }
3103 #endif
3104 };
3105 #endif //variadic max 10
3106 
3107 #if __TBB_PREVIEW_ASYNC_MSG
3109 #else
3110 template< typename T >
3111 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3112 #endif
3113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3114  s.internal_add_built_predecessor(p);
3115  p.internal_add_built_successor(s);
3116 #endif
3117  p.register_successor( s );
3119 }
3120 
3122 template< typename T >
3123 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3124  internal_make_edge( p, s );
3125 }
3126 
3127 #if __TBB_PREVIEW_ASYNC_MSG
3128 template< typename TS, typename TR,
3131 inline void make_edge( TS &p, TR &s ) {
3132  internal_make_edge( p, s );
3133 }
3134 
3135 template< typename T >
3137  internal_make_edge( p, s );
3138 }
3139 
3140 template< typename T >
3142  internal_make_edge( p, s );
3143 }
3144 
3145 #endif // __TBB_PREVIEW_ASYNC_MSG
3146 
3147 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3148 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3149 template< typename T, typename V,
3150  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3151 inline void make_edge( T& output, V& input) {
3152  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3153 }
3154 
3155 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3156 template< typename T, typename R,
3157  typename = typename T::output_ports_type >
3158 inline void make_edge( T& output, receiver<R>& input) {
3159  make_edge(get<0>(output.output_ports()), input);
3160 }
3161 
3162 //Makes an edge from a sender to port 0 of a multi-input successor.
3163 template< typename S, typename V,
3164  typename = typename V::input_ports_type >
3165 inline void make_edge( sender<S>& output, V& input) {
3166  make_edge(output, get<0>(input.input_ports()));
3167 }
3168 #endif
3169 
3170 #if __TBB_PREVIEW_ASYNC_MSG
3172 #else
3173 template< typename T >
3174 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3175 #endif
3176  p.remove_successor( s );
3177 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3178  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3179  p.internal_delete_built_successor(s);
3180  s.internal_delete_built_predecessor(p);
3181 #endif
3183 }
3184 
3186 template< typename T >
3187 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3188  internal_remove_edge( p, s );
3189 }
3190 
3191 #if __TBB_PREVIEW_ASYNC_MSG
3192 template< typename TS, typename TR,
3195 inline void remove_edge( TS &p, TR &s ) {
3196  internal_remove_edge( p, s );
3197 }
3198 
3199 template< typename T >
3201  internal_remove_edge( p, s );
3202 }
3203 
3204 template< typename T >
3206  internal_remove_edge( p, s );
3207 }
3208 #endif // __TBB_PREVIEW_ASYNC_MSG
3209 
3210 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3211 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3212 template< typename T, typename V,
3213  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3214 inline void remove_edge( T& output, V& input) {
3215  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3216 }
3217 
3218 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3219 template< typename T, typename R,
3220  typename = typename T::output_ports_type >
3221 inline void remove_edge( T& output, receiver<R>& input) {
3222  remove_edge(get<0>(output.output_ports()), input);
3223 }
3224 //Removes an edge between a sender and port 0 of a multi-input successor.
3225 template< typename S, typename V,
3226  typename = typename V::input_ports_type >
3227 inline void remove_edge( sender<S>& output, V& input) {
3228  remove_edge(output, get<0>(input.input_ports()));
3229 }
3230 #endif
3231 
3232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3233 template<typename C >
3234 template< typename S >
3235 void internal::edge_container<C>::sender_extract( S &s ) {
3236  edge_list_type e = built_edges;
3237  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3238  remove_edge(s, **i);
3239  }
3240 }
3241 
3242 template<typename C >
3243 template< typename R >
3244 void internal::edge_container<C>::receiver_extract( R &r ) {
3245  edge_list_type e = built_edges;
3246  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3247  remove_edge(**i, r);
3248  }
3249 }
3250 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3251 
3253 template< typename Body, typename Node >
3254 Body copy_body( Node &n ) {
3255  return n.template copy_function_object<Body>();
3256 }
3257 
3258 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3259 
3260 //composite_node
3261 template< typename InputTuple, typename OutputTuple > class composite_node;
3262 
3263 template< typename... InputTypes, typename... OutputTypes>
3264 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3265 
3266 public:
3267  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3268  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3269 
3270 private:
3271  std::unique_ptr<input_ports_type> my_input_ports;
3272  std::unique_ptr<output_ports_type> my_output_ports;
3273 
3274  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3275  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3276 
3277 protected:
3279 
3280 public:
3281 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3282  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3283  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3285  }
3286 #else
3288  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3289  }
3290 #endif
3291 
3292  template<typename T1, typename T2>
3293  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3294  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3295  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3296  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3297  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3298 
3301  }
3302 
3303  template< typename... NodeTypes >
3304  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3305 
3306  template< typename... NodeTypes >
3307  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3308 
3309 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3310  void set_name( const char *name ) __TBB_override {
3312  }
3313 #endif
3314 
3316  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3317  return *my_input_ports;
3318  }
3319 
3321  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3322  return *my_output_ports;
3323  }
3324 
3325 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3326  void extract() __TBB_override {
3327  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3328  }
3329 #endif
3330 }; // class composite_node
3331 
3332 //composite_node with only input ports
3333 template< typename... InputTypes>
3334 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3335 public:
3336  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3337 
3338 private:
3339  std::unique_ptr<input_ports_type> my_input_ports;
3340  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3341 
3342 protected:
3344 
3345 public:
3346 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3347  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3348  tbb::internal::fgt_composite( this, &g );
3350  }
3351 #else
3353  tbb::internal::fgt_composite( this, &g );
3354  }
3355 #endif
3356 
3357  template<typename T>
3358  void set_external_ports(T&& input_ports_tuple) {
3359  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3360 
3361  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3362 
3363  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3364  }
3365 
3366  template< typename... NodeTypes >
3367  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3368 
3369  template< typename... NodeTypes >
3370  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3371 
3372 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3373  void set_name( const char *name ) __TBB_override {
3375  }
3376 #endif
3377 
3379  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3380  return *my_input_ports;
3381  }
3382 
3383 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3384  void extract() __TBB_override {
3385  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3386  }
3387 #endif
3388 
3389 }; // class composite_node
3390 
3391 //composite_nodes with only output_ports
3392 template<typename... OutputTypes>
3393 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3394 public:
3395  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3396 
3397 private:
3398  std::unique_ptr<output_ports_type> my_output_ports;
3399  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3400 
3401 protected:
3403 
3404 public:
3405 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3406  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3407  tbb::internal::fgt_composite( this, &g );
3409  }
3410 #else
3412  tbb::internal::fgt_composite( this, &g );
3413  }
3414 #endif
3415 
3416  template<typename T>
3417  void set_external_ports(T&& output_ports_tuple) {
3418  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3419 
3420  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3421 
3422  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3423  }
3424 
3425  template<typename... NodeTypes >
3426  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3427 
3428  template<typename... NodeTypes >
3429  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3430 
3431 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3432  void set_name( const char *name ) __TBB_override {
3434  }
3435 #endif
3436 
3438  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3439  return *my_output_ports;
3440  }
3441 
3442 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3443  void extract() __TBB_override {
3444  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3445  }
3446 #endif
3447 
3448 }; // class composite_node
3449 
3450 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3451 
3452 namespace internal {
3453 
3454 template<typename Gateway>
3456 public:
3457  typedef Gateway gateway_type;
3458 
3459  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3460  void set_gateway(gateway_type *gateway) {
3461  my_gateway = gateway;
3462  }
3463 
3464 protected:
3466 };
3467 
3468 template<typename Input, typename Ports, typename Gateway, typename Body>
3469 class async_body: public async_body_base<Gateway> {
3470 public:
3472  typedef Gateway gateway_type;
3473 
3474  async_body(const Body &body, gateway_type *gateway)
3475  : base_type(gateway), my_body(body) { }
3476 
3477  void operator()( const Input &v, Ports & ) {
3478  my_body(v, *this->my_gateway);
3479  }
3480 
3481  Body get_body() { return my_body; }
3482 
3483 private:
3484  Body my_body;
3485 };
3486 
3487 }
3488 
3490 template < typename Input, typename Output,
3491  typename Policy = queueing_lightweight,
3492  typename Allocator=cache_aligned_allocator<Input> >
3493 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3496 
3497 public:
3498  typedef Input input_type;
3499  typedef Output output_type;
3506 
3507 private:
3511  // TODO: pass value by copy since we do not want to block asynchronous thread.
3512  const Output *value;
3513  bool result;
3514  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3515  void operator()() {
3516  result = port->try_put(*value);
3517  }
3518  };
3519 
3520  class receiver_gateway_impl: public receiver_gateway<Output> {
3521  public:
3524  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3526  }
3527 
3530  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3531  }
3532 
3534  bool try_put(const Output &i) __TBB_override {
3535  return my_node->try_put_impl(i);
3536  }
3537 
3538  private:
3540  } my_gateway;
3541 
3542  //The substitute of 'this' for member construction, to prevent compiler warnings
3543  async_node* self() { return this; }
3544 
3546  bool try_put_impl(const Output &i) {
3547  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3549  try_put_functor tpf(port_0, i);
3551  tbb::internal::fgt_async_try_put_end(this, &port_0);
3552  return tpf.result;
3553  }
3554 
3555 public:
3556  template<typename Body>
3558  graph &g, size_t concurrency,
3560  ) : base_type(
3561  g, concurrency,
3562  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3563  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3564  tbb::internal::fgt_multioutput_node_with_body<1>(
3565  tbb::internal::FLOW_ASYNC_NODE,
3566  &this->my_graph, static_cast<receiver<input_type> *>(this),
3567  this->output_ports(), this->my_body
3568  );
3569  }
3570 
3571  async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3572  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3573  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3574 
3575  tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3576  &this->my_graph, static_cast<receiver<input_type> *>(this),
3577  this->output_ports(), this->my_body );
3578  }
3579 
3581  return my_gateway;
3582  }
3583 
3584 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3585  void set_name( const char *name ) __TBB_override {
3587  }
3588 #endif
3589 
3590  // Define sender< Output >
3591 
3594  return internal::output_port<0>(*this).register_successor(r);
3595  }
3596 
3599  return internal::output_port<0>(*this).remove_successor(r);
3600  }
3601 
3602  template<typename Body>
3606  mfn_body_type &body_ref = *this->my_body;
3607  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3608  return ab.get_body();
3609  }
3610 
3611 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3612  typedef typename internal::edge_container<successor_type> built_successors_type;
3614  typedef typename built_successors_type::edge_list_type successor_list_type;
3615  built_successors_type &built_successors() __TBB_override {
3616  return internal::output_port<0>(*this).built_successors();
3617  }
3618 
3619  void internal_add_built_successor( successor_type &r ) __TBB_override {
3620  internal::output_port<0>(*this).internal_add_built_successor(r);
3621  }
3622 
3623  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3624  internal::output_port<0>(*this).internal_delete_built_successor(r);
3625  }
3626 
3627  void copy_successors( successor_list_type &l ) __TBB_override {
3628  internal::output_port<0>(*this).copy_successors(l);
3629  }
3630 
3631  size_t successor_count() __TBB_override {
3632  return internal::output_port<0>(*this).successor_count();
3633  }
3634 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3635 
3636 protected:
3637 
3640  }
3641 };
3642 
3643 #if __TBB_PREVIEW_STREAMING_NODE
3645 #endif // __TBB_PREVIEW_STREAMING_NODE
3646 
3647 } // interfaceX
3648 
3649 
3650 namespace interface10a {
3651 
3652 using namespace interface10;
3653 namespace internal = interface10::internal;
3654 
3655 template< typename T >
3656 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3657 public:
3658  typedef T input_type;
3659  typedef T output_type;
3662 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3663  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
3664  typedef typename sender<output_type>::built_successors_type built_successors_type;
3665  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
3666  typedef typename sender<output_type>::successor_list_type successor_list_type;
3667 #endif
3668 
3669  explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
3670  my_successors.set_owner( this );
3671  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3672  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3673  }
3674 
3677  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
3678  {
3679  my_successors.set_owner( this );
3680  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3681  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3682  }
3683 
3685 
3686 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3687  void set_name( const char *name ) __TBB_override {
3689  }
3690 #endif
3691 
3693  spin_mutex::scoped_lock l( my_mutex );
3694  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
3695  // We have a valid value that must be forwarded immediately.
3696  bool ret = s.try_put( my_buffer );
3697  if ( ret ) {
3698  // We add the successor that accepted our put
3699  my_successors.register_successor( s );
3700  } else {
3701  // In case of reservation a race between the moment of reservation and register_successor can appear,
3702  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3703  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3704  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3705  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
3706  register_predecessor_task( *this, s );
3707  internal::spawn_in_graph_arena( my_graph, *rtask );
3708  }
3709  } else {
3710  // No valid value yet, just add as successor
3711  my_successors.register_successor( s );
3712  }
3713  return true;
3714  }
3715 
3717  spin_mutex::scoped_lock l( my_mutex );
3718  my_successors.remove_successor(s);
3719  return true;
3720  }
3721 
3722 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3723  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
3724  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3725 
3726  void internal_add_built_successor( successor_type &s) __TBB_override {
3727  spin_mutex::scoped_lock l( my_mutex );
3728  my_successors.internal_add_built_successor(s);
3729  }
3730 
3731  void internal_delete_built_successor( successor_type &s) __TBB_override {
3732  spin_mutex::scoped_lock l( my_mutex );
3733  my_successors.internal_delete_built_successor(s);
3734  }
3735 
3736  size_t successor_count() __TBB_override {
3737  spin_mutex::scoped_lock l( my_mutex );
3738  return my_successors.successor_count();
3739  }
3740 
3741  void copy_successors(successor_list_type &v) __TBB_override {
3742  spin_mutex::scoped_lock l( my_mutex );
3743  my_successors.copy_successors(v);
3744  }
3745 
3746  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
3747  spin_mutex::scoped_lock l( my_mutex );
3748  my_built_predecessors.add_edge(p);
3749  }
3750 
3751  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
3752  spin_mutex::scoped_lock l( my_mutex );
3753  my_built_predecessors.delete_edge(p);
3754  }
3755 
3756  size_t predecessor_count() __TBB_override {
3757  spin_mutex::scoped_lock l( my_mutex );
3758  return my_built_predecessors.edge_count();
3759  }
3760 
3761  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
3762  spin_mutex::scoped_lock l( my_mutex );
3763  my_built_predecessors.copy_edges(v);
3764  }
3765 
3766  void extract() __TBB_override {
3767  my_buffer_is_valid = false;
3768  built_successors().sender_extract(*this);
3769  built_predecessors().receiver_extract(*this);
3770  }
3771 
3772 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3773 
3775  spin_mutex::scoped_lock l( my_mutex );
3776  if ( my_buffer_is_valid ) {
3777  v = my_buffer;
3778  return true;
3779  }
3780  return false;
3781  }
3782 
3785  return try_get(v);
3786  }
3787 
3789  bool try_release() __TBB_override { return true; }
3790 
3792  bool try_consume() __TBB_override { return true; }
3793 
3794  bool is_valid() {
3795  spin_mutex::scoped_lock l( my_mutex );
3796  return my_buffer_is_valid;
3797  }
3798 
3799  void clear() {
3800  spin_mutex::scoped_lock l( my_mutex );
3801  my_buffer_is_valid = false;
3802  }
3803 
3804 protected:
3805 
3806  template< typename R, typename B > friend class run_and_put_task;
3807  template<typename X, typename Y> friend class internal::broadcast_cache;
3808  template<typename X, typename Y> friend class internal::round_robin_cache;
3811  return try_put_task_impl(v);
3812  }
3813 
3815  my_buffer = v;
3816  my_buffer_is_valid = true;
3817  task * rtask = my_successors.try_put_task(v);
3818  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3819  return rtask;
3820  }
3821 
3823  return my_graph;
3824  }
3825 
3828 
3830  o(owner), s(succ) {};
3831 
3833  if (!s.register_predecessor(o)) {
3834  o.register_successor(s);
3835  }
3836  return NULL;
3837  }
3838 
3841  };
3842 
3845 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3846  internal::edge_container<predecessor_type> my_built_predecessors;
3847 #endif
3851 
3853  my_buffer_is_valid = false;
3854  if (f&rf_clear_edges) {
3855  my_successors.clear();
3856  }
3857  }
3858 }; // overwrite_node
3859 
3860 template< typename T >
3861 class write_once_node : public overwrite_node<T> {
3862 public:
3863  typedef T input_type;
3864  typedef T output_type;
3868 
3870  explicit write_once_node(graph& g) : base_type(g) {
3871  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3872  static_cast<receiver<input_type> *>(this),
3873  static_cast<sender<output_type> *>(this) );
3874  }
3875 
3878  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3879  static_cast<receiver<input_type> *>(this),
3880  static_cast<sender<output_type> *>(this) );
3881  }
3882 
3883 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3884  void set_name( const char *name ) __TBB_override {
3886  }
3887 #endif
3888 
3889 protected:
3890  template< typename R, typename B > friend class run_and_put_task;
3891  template<typename X, typename Y> friend class internal::broadcast_cache;
3892  template<typename X, typename Y> friend class internal::round_robin_cache;
3894  spin_mutex::scoped_lock l( this->my_mutex );
3895  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3896  }
3897 };
3898 } // interfaceX
3899 
3904 
3905  using interface10::graph;
3906  using interface10::graph_node;
3907  using interface10::continue_msg;
3908 
3909  using interface10::source_node;
3910  using interface10::function_node;
3911  using interface10::multifunction_node;
3912  using interface10::split_node;
3914  using interface10::indexer_node;
3915  using interface10::internal::tagged_msg;
3918  using interface10::continue_node;
3919  using interface10a::overwrite_node;
3920  using interface10a::write_once_node;
3921  using interface10::broadcast_node;
3922  using interface10::buffer_node;
3923  using interface10::queue_node;
3924  using interface10::sequencer_node;
3925  using interface10::priority_queue_node;
3926  using interface10::limiter_node;
3927  using namespace interface10::internal::graph_policy_namespace;
3928  using interface10::join_node;
3930  using interface10::copy_body;
3931  using interface10::make_edge;
3934 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3935  using interface10::composite_node;
3936 #endif
3937  using interface10::async_node;
3938 #if __TBB_PREVIEW_ASYNC_MSG
3939  using interface10::async_msg;
3940 #endif
3941 #if __TBB_PREVIEW_STREAMING_NODE
3942  using interface10::port_ref;
3943  using interface10::streaming_node;
3944 #endif // __TBB_PREVIEW_STREAMING_NODE
3945 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3947  using internal::no_priority;
3948 #endif
3949 
3950 
3951 } // flow
3952 } // tbb
3953 
3954 #undef __TBB_PFG_RESET_ARG
3955 #undef __TBB_COMMA
3956 
3957 #endif // __TBB_flow_graph_H
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1415
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:580
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:207
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3501
Forwards messages in priority order.
Definition: flow_graph.h:2190
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
async_node(const async_node &other)
Definition: flow_graph.h:3571
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:569
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1957
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3852
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:109
bool place_item(size_t here, const item_type &me)
Definition: flow_graph.h:112
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1038
static void fgt_async_commit(void *, void *)
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:425
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_override
Definition: tbb_stddef.h:244
Base class for tasks generated by graph nodes.
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3893
static void fgt_async_reserve(void *, void *)
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:2766
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:3774
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1345
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2131
Input and scheduling for a function node that takes a type Input as input.
Definition: flow_graph.h:65
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:1931
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3059
function_node(graph &g, size_t concurrency,)
Constructor.
Definition: flow_graph.h:1142
buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:1839
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1530
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2607
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:1989
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1098
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3850
const_iterator cend() const
end const iterator
Definition: flow_graph.h:845
static void fgt_make_edge(void *, void *)
graph & graph_reference() __TBB_override
Definition: flow_graph.h:3822
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2271
T input_type
The input type of this receiver.
Definition: flow_graph.h:440
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2665
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
A task that calls a node's forward_task function.
Definition: flow_graph.h:275
pointer operator->() const
Dereference.
Definition: flow_graph.h:735
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1604
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2085
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3598
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:536
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
receiver< input_type > receiver_type
Definition: flow_graph.h:3500
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1666
indexer_node(const indexer_node &other)
Definition: flow_graph.h:2856
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:1613
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:445
continue_msg input_type
The input type.
Definition: flow_graph.h:577
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:365
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:1946
source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:883
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3031
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:197
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2900
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:111
void fetch_item(size_t i, item_type &o)
Definition: flow_graph.h:94
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2619
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1662
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:1699
multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1233
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:88
task that does nothing. Useful for synchronization.
Definition: task.h:959
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2630
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3809
interface10::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:92
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:201
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
~graph()
Destroys the graph.
Definition: flow_graph.h:767
Used to form groups of tasks.
Definition: task.h:335
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2226
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1130
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3692
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1340
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:868
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1013
receiver< TupleType > base_type
Definition: flow_graph.h:1262
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1347
void remove_node(graph_node *n)
Definition: flow_graph.h:800
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1827
A task that calls a node's apply_body_bypass function with no input.
Definition: flow_graph.h:325
static void fgt_begin_body(void *)
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1122
static void fgt_async_try_put_end(void *, void *)
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2161
void move_item(size_t to, size_t from)
Definition: flow_graph.h:103
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3123
static void fgt_composite(void *, void *)
Implements methods for both executable and function nodes that puts Output to its successors.
Definition: flow_graph.h:783
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:2899
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2012
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3505
graph_iterator< const graph, const graph_node > const_iterator
Detects whether two given types are the same.
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1818
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2657
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:343
K key_from_message(const T &t)
Definition: flow_graph.h:695
void register_node(graph_node *n)
Definition: flow_graph.h:789
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1997
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:96
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3495
reference operator*() const
Dereference.
Definition: flow_graph.h:729
tbb::task_group_context * my_context
virtual bool try_get_wrapper(void *p, bool is_async)=0
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2746
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1422
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2076
overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:3676
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2422
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:1556
Forwards messages in sequence order.
Definition: flow_graph.h:2124
Base class for user-defined tasks.
Definition: task.h:592
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2661
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2239
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:991
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3171
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:1606
static void fgt_async_try_put_begin(void *, void *)
continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1352
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3494
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3514
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:843
Forwards messages in arbitrary order.
Definition: flow_graph.h:1543
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1967
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1059
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2117
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1548
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:3827
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:2771
tbb::task * root_task()
Returns the root task of the graph.
queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2104
void grow_my_array(size_t minimum_size)
Grows the internal array.
Definition: flow_graph.h:164
static void fgt_graph_desc(void *, const char *)
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3086
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2850
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1260
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1348
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:350
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:648
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1127
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:883
static const node_priority_t no_priority
unsigned int node_priority_t
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3503
The base of all graph nodes.
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:2924
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:662
void const char const char int ITT_FORMAT __itt_group_sync p
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1304
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1374
static void fgt_release_wait(void *)
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1451
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1762
A lock that occupies a single byte.
Definition: spin_mutex.h:40
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:3792
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:1676
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:406
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:925
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2042
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:100
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:217
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:253
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2144
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3523
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:2949
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:2779
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3638
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:46
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:1803
A generic null type.
Definition: flow_graph.h:93
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1547
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:2874
graph_iterator< graph, graph_node > iterator
Forwards messages in FIFO order.
Definition: flow_graph.h:2038
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2093
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2560
friend class scoped_lock
Definition: spin_mutex.h:180
broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1436
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2718
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2409
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:974
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1218
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1277
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3254
iterator end()
end iterator
Definition: flow_graph.h:837
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:2788
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:3814
priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2207
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:298
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:409
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:2797
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:2761
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1216
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3187
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3108
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2693
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:782
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1976
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2094
static void fgt_graph(void *)
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3502
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1402
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2245
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2197
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3867
The graph class.
size_type size(size_t new_tail=0)
Definition: flow_graph.h:157
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1299
static void fgt_multiinput_multioutput_node(string_index, void *, void *)
T output_type
The output type of this sender.
Definition: flow_graph.h:401
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2231
static void fgt_remove_edge(void *, void *)
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2277
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:871
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2522
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:442
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:301
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:812
Enables one or the other code branches.
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3534
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3087
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1106
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:422
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2023
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2408
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1215
A cache of successors that are broadcast to.
Definition: flow_graph.h:108
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:412
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3004
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:761
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2875
limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0)
Constructor.
Definition: flow_graph.h:2508
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:102
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:672
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1310
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1001
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3504
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3477
internal::decrementer< limiter_node< T > > decrement
The internal receiver< continue_msg > that decrements the count.
Definition: flow_graph.h:2505
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:1693
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1126
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
Definition: flow_graph.h:1509
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3593
source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:895
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3660
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3474
async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3557
multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1222
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3088
static void fgt_node_with_body(string_index, void *, void *, void *)
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1214
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3546
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1346
const item_type & get_my_item(size_t i) const
Definition: flow_graph.h:78
indexer_node(const indexer_node &other)
Definition: flow_graph.h:3094
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:3844
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2424
virtual void finalize() const
Definition: flow_graph.h:151
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3032
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2227
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:744
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1528
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2221
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2235
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3866
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:209
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:726
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2052
bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:598
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2125
iterator begin()
start iterator
Definition: flow_graph.h:835
static void fgt_end_body(void *)
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3060
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3003
graph & graph_reference() __TBB_override
Definition: flow_graph.h:1311
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:316
void const char const char int ITT_FORMAT __itt_group_sync s
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2259
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2132
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:110
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2543
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:454
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:403
buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:1848
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:3789
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2067
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:362
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2925
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:2815
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1254
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2296
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1416
void set_ref_count(int count)
Set reference count.
Definition: task.h:734
task * try_put_task(const input_type &) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:646
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2008
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:3829
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:1774
Base class for receivers of completion messages.
Definition: flow_graph.h:573
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2196
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1128
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
static void fgt_reserve_wait(void *)
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1511
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:865
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:745
graph & graph_reference() __TBB_override
Definition: flow_graph.h:1524
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1518
queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2097
tbb::flow::interface10::async_node::receiver_gateway_impl my_gateway
async_body_base< Gateway > base_type
Definition: flow_graph.h:3471
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:636
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:682
void prepare_task_arena(bool reinit=false)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1400
base_type::size_type size_type
Definition: flow_graph.h:2041
static void fgt_multioutput_node_desc(const NodeType *, const char *)
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3509
static void fgt_node(string_index, void *, void *)
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:687
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:1779
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1129
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1066
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:2975
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:2806
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:283
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1180
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:3832
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:33
write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:3877
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1060
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3716
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:957
function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1151
output_ports_type & output_ports()
Definition: flow_graph.h:1296
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:322
const V & cast_to(T const &t)
Definition: flow_graph.h:705
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:775
sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2136
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:3784
Forwards messages of type T to all successors.
Definition: flow_graph.h:1411
An abstract cache of successors.
Definition: flow_graph.h:107
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3528
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2165
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1178
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2162
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:1869
Implements async node.
Definition: flow_graph.h:3493
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:1608
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1832
bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:608
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1196
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:717
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2950
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2976
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3661
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2063
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:916
continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1363
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1058
continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:591
continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:583
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1457
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2228
static void fgt_node_desc(const NodeType *, const char *)
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:2849
Forward declaration section.
Definition: flow_graph.h:99
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1809

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.