Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 #include "../tbb_stddef.h"
21 #include "../task.h"
22 #include "../task_arena.h"
23 #include "../flow_graph_abstractions.h"
24 
25 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26 #include "../concurrent_priority_queue.h"
27 #endif
28 
29 #include <list>
30 
31 #if TBB_DEPRECATED_FLOW_ENQUEUE
32 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
33 #else
34 #define FLOW_SPAWN(a) tbb::task::spawn((a))
35 #endif
36 
37 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41 #else
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46 
47 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
48 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
49 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
50 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
51 #else
52 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
53 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
54 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
55 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
56 
57 namespace tbb {
58 namespace flow {
59 
60 namespace internal {
61 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
62 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
63 typedef unsigned int node_priority_t;
65 #endif
66 }
67 
68 namespace interface10 {
69 
71 
72 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
76 struct graph_task : public task {
77  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
79 };
80 #else
81 typedef task graph_task;
82 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
83 
84 
85 class graph;
86 class graph_node;
87 
88 template <typename GraphContainerType, typename GraphNodeType>
90  friend class graph;
91  friend class graph_node;
92 public:
93  typedef size_t size_type;
94  typedef GraphNodeType value_type;
95  typedef GraphNodeType* pointer;
96  typedef GraphNodeType& reference;
97  typedef const GraphNodeType& const_reference;
98  typedef std::forward_iterator_tag iterator_category;
99 
102 
106  {}
107 
110  if (this != &other) {
111  my_graph = other.my_graph;
112  current_node = other.current_node;
113  }
114  return *this;
115  }
116 
118  reference operator*() const;
119 
121  pointer operator->() const;
122 
124  bool operator==(const graph_iterator& other) const {
125  return ((my_graph == other.my_graph) && (current_node == other.current_node));
126  }
127 
129  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
130 
134  return *this;
135  }
136 
139  graph_iterator result = *this;
140  operator++();
141  return result;
142  }
143 
144 private:
145  // the graph over which we are iterating
146  GraphContainerType *my_graph;
147  // pointer into my_graph's my_nodes list
149 
151  graph_iterator(GraphContainerType *g, bool begin);
152  void internal_forward();
153 }; // class graph_iterator
154 
155 // flags to modify the behavior of the graph reset(). Can be combined.
158  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
159  rf_clear_edges = 1 << 1 // delete edges
160 };
161 
162 namespace internal {
163 
164 void activate_graph(graph& g);
165 void deactivate_graph(graph& g);
166 bool is_graph_active(graph& g);
167 tbb::task& prioritize_task(graph& g, tbb::task& arena_task);
168 void spawn_in_graph_arena(graph& g, tbb::task& arena_task);
169 void enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
171 
172 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
174  bool operator()(const graph_task* left, const graph_task* right) {
175  return left->priority < right->priority;
176  }
177 };
178 
180 
181 class priority_task_selector : public task {
182 public:
184  : my_priority_queue(priority_queue) {}
186  graph_task* t = NULL;
187  bool result = my_priority_queue.try_pop(t);
188  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
189  " in graph's priority queue mismatched" );
191  "Incorrect task submitted to graph priority queue" );
193  "Tasks from graph's priority queue must have priority" );
194  task* t_next = t->execute();
195  task::destroy(*t);
196  return t_next;
197  }
198 private:
200 };
201 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
202 
203 }
204 
206 
208  friend class graph_node;
209 
210  template< typename Body >
211  class run_task : public graph_task {
212  public:
213  run_task(Body& body
215  , node_priority_t node_priority = no_priority
216  ) : graph_task(node_priority),
217 #else
218  ) :
219 #endif
220  my_body(body) { }
222  my_body();
223  return NULL;
224  }
225  private:
226  Body my_body;
227  };
228 
229  template< typename Receiver, typename Body >
230  class run_and_put_task : public graph_task {
231  public:
232  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
234  tbb::task *res = my_receiver.try_put_task(my_body());
235  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
236  return res;
237  }
238  private:
239  Receiver &my_receiver;
240  Body my_body;
241  };
242  typedef std::list<tbb::task *> task_list_type;
243 
244  class wait_functor {
246  public:
249  };
250 
254  public:
256  void operator()() const {
258  }
259  };
260 
261  void prepare_task_arena(bool reinit = false) {
262  if (reinit) {
263  __TBB_ASSERT(my_task_arena, "task arena is NULL");
266  }
267  else {
268  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
270  }
271  if (!my_task_arena->is_active()) // failed to attach
272  my_task_arena->initialize(); // create a new, default-initialized arena
273  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
274  }
275 
276 public:
278  graph();
279 
281  explicit graph(tbb::task_group_context& use_this_context);
282 
284 
285  ~graph();
286 
287 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
288  void set_name(const char *name);
289 #endif
290 
292  reserve_wait();
293  }
294 
296  release_wait();
297  }
298 
300 
303 
305 
308 
310 
312  template< typename Receiver, typename Body >
313  void run(Receiver &r, Body body) {
314  if (internal::is_graph_active(*this)) {
315  task* rtask = new (task::allocate_additional_child_of(*root_task()))
318  }
319  }
320 
322 
324  template< typename Body >
325  void run(Body body) {
326  if (internal::is_graph_active(*this)) {
327  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
329  }
330  }
331 
333 
334  void wait_for_all() {
335  cancelled = false;
336  caught_exception = false;
337  if (my_root_task) {
338 #if TBB_USE_EXCEPTIONS
339  try {
340 #endif
343 #if TBB_USE_EXCEPTIONS
344  }
345  catch (...) {
347  my_context->reset();
348  caught_exception = true;
349  cancelled = true;
350  throw;
351  }
352 #endif
353  // TODO: the "if" condition below is just a work-around to support the concurrent wait
354  // mode. The cancellation and exception mechanisms are still broken in this mode.
355  // Consider using task group not to re-implement the same functionality.
357  my_context->reset(); // consistent with behavior in catch()
359  }
360  }
361  }
362 
365  return my_root_task;
366  }
367 
368  // ITERATORS
369  template<typename C, typename N>
370  friend class graph_iterator;
371 
372  // Graph iterator typedefs
375 
376  // Graph iterator constructors
378  iterator begin();
380  iterator end();
382  const_iterator begin() const;
384  const_iterator end() const;
386  const_iterator cbegin() const;
388  const_iterator cend() const;
389 
391  bool is_cancelled() { return cancelled; }
393 
394  // thread-unsafe state reset.
396 
397 private:
401  bool cancelled;
405 
407 
409  void register_node(graph_node *n);
410  void remove_node(graph_node *n);
411 
413 
414 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
416 #endif
417 
418  friend void internal::activate_graph(graph& g);
419  friend void internal::deactivate_graph(graph& g);
420  friend bool internal::is_graph_active(graph& g);
421  friend tbb::task& internal::prioritize_task(graph& g, tbb::task& arena_task);
422  friend void internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
423  friend void internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
425 
427 
428 }; // class graph
429 
432  friend class graph;
433  template<typename C, typename N>
434  friend class graph_iterator;
435 protected:
438 public:
439  explicit graph_node(graph& g);
440 
441  virtual ~graph_node();
442 
443 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
444  virtual void set_name(const char *name) = 0;
445 #endif
446 
447 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
448  virtual void extract() = 0;
449 #endif
450 
451 protected:
452  // performs the reset on an individual node.
453  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
454 }; // class graph_node
455 
456 namespace internal {
457 
458 inline void activate_graph(graph& g) {
459  g.my_is_active = true;
460 }
461 
462 inline void deactivate_graph(graph& g) {
463  g.my_is_active = false;
464 }
465 
466 inline bool is_graph_active(graph& g) {
467  return g.my_is_active;
468 }
469 
470 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
472  task* critical_task = &t;
473  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
474  graph_task* gt = static_cast<graph_task*>(&t);
475  if( gt->priority != no_priority ) {
480  critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
481  tbb::internal::make_critical( *critical_task );
482  g.my_priority_queue.push(gt);
483  }
484  return *critical_task;
485 }
486 #else
488  return t;
489 }
490 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
491 
493 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
494  if (is_graph_active(g)) {
495  graph::spawn_functor s_fn(prioritize_task(g, arena_task));
497  g.my_task_arena->execute(s_fn);
498  }
499 }
500 
502 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
503  if (is_graph_active(g)) {
504  __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
505  task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
506  }
507 }
508 
510  g.my_reset_task_list.push_back(tp);
511 }
512 
513 } // namespace internal
514 
515 } // namespace interface10
516 } // namespace flow
517 } // namespace tbb
518 
519 #endif // __TBB_flow_graph_impl_H
static void enqueue(task &t)
Enqueue task for starvation-resistant execution.
Definition: task.h:806
~graph()
Destroys the graph.
Definition: flow_graph.h:765
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
graph_iterator & operator=(const graph_iterator &other)
Assignment.
void prepare_task_arena(bool reinit=false)
#define __TBB_override
Definition: tbb_stddef.h:240
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:841
bool operator()(const graph_task *left, const graph_task *right)
unsigned int node_priority_t
void make_critical(task &t)
Definition: task.h:957
graph_task(node_priority_t node_priority=no_priority)
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
graph_iterator< const graph, const graph_node > const_iterator
The base of all graph nodes.
graph_iterator< graph, graph_node > iterator
bool __TBB_EXPORTED_METHOD is_group_execution_cancelled() const
Returns true if the context received cancellation request.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
bool is_cancelled()
return status of graph execution
Pure virtual template classes that define interfaces for async communication.
tbb::task & prioritize_task(graph &g, tbb::task &arena_task)
The graph class.
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:780
void run(Body body)
Spawns a task that runs a function object.
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:820
Used to form groups of tasks.
Definition: task.h:332
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
task * execute() __TBB_override
Should be overridden by derived classes.
iterator begin()
start iterator
Definition: flow_graph.h:833
static tbb::task *const SUCCESSFULLY_ENQUEUED
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:773
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:789
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
uintptr_t traits() const
Returns the context's trait.
Definition: task.h:552
void enqueue_in_graph_arena(graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
std::forward_iterator_tag iterator_category
pointer operator->() const
Dereference.
Definition: flow_graph.h:733
tbb::task_group_context * my_context
A lock that occupies a single byte.
Definition: spin_mutex.h:36
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
bool operator==(const graph_iterator &other) const
Equality.
void remove_node(graph_node *n)
Definition: flow_graph.h:798
priority_task_selector(graph_task_priority_queue_t &priority_queue)
const_iterator cend() const
end const iterator
Definition: flow_graph.h:843
void set_ref_count(int count)
Set reference count.
Definition: task.h:731
reference operator*() const
Dereference.
Definition: flow_graph.h:727
iterator end()
end iterator
Definition: flow_graph.h:835
static const node_priority_t no_priority
Base class for user-defined tasks.
Definition: task.h:589
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:810
run_task(Body &body, node_priority_t node_priority=no_priority)
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:646
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
graph_iterator & operator++()
Pre-increment.
virtual task * execute()=0
Should be overridden by derived classes.
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
graph_iterator(const graph_iterator &other)
Copy constructor.
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
tbb::task * root_task()
Returns the root task of the graph.
#define FLOW_SPAWN(a)
std::list< tbb::task * > task_list_type
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:343
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:236
bool operator!=(const graph_iterator &other) const
Inequality.
void initialize()
Forces allocation of the resources for the task_arena as specified in constructor arguments.
Definition: task_arena.h:248
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
graph_iterator operator++(int)
Post-increment.
void register_node(graph_node *n)
Definition: flow_graph.h:787
Base class for tasks generated by graph nodes.
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:743
internal::graph_task_priority_queue_t my_priority_queue

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.