17 #ifndef __TBB__flow_graph_node_impl_H 18 #define __TBB__flow_graph_node_impl_H 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 33 template<
typename T,
typename A >
60 template<
typename Input,
typename Policy,
typename A,
typename ImplType >
63 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 64 , add_blt_pred, del_blt_pred,
65 blt_pred_cnt, blt_pred_cpy
79 "queueing and rejecting policies can't be specified simultaneously");
81 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 82 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
83 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
100 : receiver<Input>(),
tbb::internal::no_assign()
137 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 140 operation_type op_data(add_blt_pred);
147 operation_type op_data(del_blt_pred);
153 operation_type op_data(blt_pred_cnt);
155 return op_data.cnt_val;
159 operation_type op_data(blt_pred_cpy);
201 return op_data.bypass_t;
215 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 217 predecessor_list_type *predv;
227 typedef internal::aggregating_functor<class_type, operation_type>
handler_type;
228 friend class internal::aggregating_functor<
class_type, operation_type>;
232 task* new_task = NULL;
254 op_list = op_list->next;
269 tmp->bypass_t = NULL;
288 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 317 op->bypass_t = new_task;
345 return op_data.bypass_t;
374 return static_cast<ImplType *>(
this)->apply_body_impl_bypass(i);
380 new( task::allocate_additional_child_of(*(
my_graph_ref.root_task())) )
388 operation_type op_data(
try_fwd);
391 op_data.status =
WAIT;
394 task* ttask = op_data.bypass_t;
404 new( task::allocate_additional_child_of(*(
my_graph_ref.root_task())) )
420 template<
typename Input,
typename Output,
typename Policy,
typename A>
431 template<
typename Body>
452 template<
typename Body >
470 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER 473 task* postponed_task = NULL;
478 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER 482 if( postponed_task ) {
488 #if _MSC_VER && !__INTEL_COMPILER 489 #pragma warning (push) 490 #pragma warning (disable: 4127) 493 #if _MSC_VER && !__INTEL_COMPILER 494 #pragma warning (pop) 496 if(!successor_task) {
502 return successor_task;
527 (
void)tbb::flow::get<N-1>(
p).successors().clear();
531 if(tbb::flow::get<N-1>(
p).successors().empty())
539 (
void)tbb::flow::get<0>(
p).successors().clear();
542 return tbb::flow::get<0>(
p).successors().empty();
546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 548 template<
int N>
struct extract_element {
549 template<
typename P>
static void extract_this(P &
p) {
550 (
void)tbb::flow::get<N-1>(
p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(
p));
551 extract_element<N-1>::extract_this(
p);
555 template<>
struct extract_element<1> {
556 template<
typename P>
static void extract_this(P &
p) {
557 (
void)tbb::flow::get<0>(
p).successors().built_successors().sender_extract(tbb::flow::get<0>(
p));
564 template<
typename Input,
typename OutputPortSet,
typename Policy,
typename A>
576 template<
typename Body>
596 template<
typename Body >
619 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 643 template<
size_t N,
typename MOP>
645 return tbb::flow::get<N>(op.output_ports());
657 template<
typename T,
typename P>
660 task* last_task = tbb::flow::get<N-1>(
p).try_put_task(tbb::flow::get<N-1>(t));
668 template<
typename T,
typename P>
670 task* last_task = tbb::flow::get<0>(
p).try_put_task(tbb::flow::get<0>(t));
677 template<
typename Output,
typename Policy>
689 template<
typename Body >
697 template<
typename Body >
716 template<
typename Body >
723 continue_receiver::reset_receiver(f);
755 #if _MSC_VER && !__INTEL_COMPILER 756 #pragma warning (push) 757 #pragma warning (disable: 4127) 760 #if _MSC_VER && !__INTEL_COMPILER 761 #pragma warning (pop) 766 return new ( task::allocate_additional_child_of( *(
my_graph_ref.root_task()) ) )
778 template<
typename Output >
786 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 787 typedef typename sender<output_type>::built_successors_type built_successors_type;
788 typedef typename sender<output_type>::successor_list_type successor_list_type;
808 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 813 successors().internal_add_built_successor( r );
817 successors().internal_delete_built_successor( r );
847 template<
typename Output >
859 if(!res)
return false;
877 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 878 template<
typename CompositeType>
881 template<
typename CompositeType,
typename NodeType1,
typename... NodeTypes >
882 void add_nodes_impl(CompositeType *c_node,
bool visible,
const NodeType1& n1,
const NodeTypes&... n) {
883 void *
addr = const_cast<NodeType1 *>(&n1);
892 #endif // __TBB__flow_graph_node_impl_H function_body that takes an Input and a set of output ports
static void clear_this(P &p)
output_type apply_body_impl(const input_type &i)
function_input_base< Input, Policy, A, ImplType > class_type
const size_t my_max_concurrency
function_input_queue< input_type, A > input_queue_type
task * try_get_postponed_task(const input_type &i)
void reset(reset_flags f)
function_input(graph &g, size_t max_concurrency,)
predecessor_cache< input_type, null_mutex > predecessor_cache_type
task * try_put_task(const output_type &i)
continue_msg input_type
The input type of this receiver.
unsigned int node_priority_t
virtual ~function_input_base()
Destructor.
function_output< output_type > base_type
function_input_base< Input, Policy, A, my_class > base_type
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
multifunction_body< input_type, output_ports_type > multifunction_body_type
static void fgt_end_body(void *)
task * apply_body_impl_bypass(const input_type &i)
bool try_put(const output_type &i)
task * create_forward_task()
static bool this_empty(P &p)
task * forward_task()
This is executed by an enqueued task, the "forwarder".
function_body< input_type, output_type > function_body_type
void add_nodes_impl(CompositeType *, bool)
multifunction_input< Input, OutputPortSet, Policy, A > my_class
A task that calls a node's apply_body_bypass function, passing in an input of type Input.
const item_type & front() const
Implements methods for a function node that takes a type Input as input.
task * apply_body_impl_bypass(const input_type &i)
Body copy_function_object()
Implements methods for a function node that takes a type Input as input and sends.
Input input_type
The input type of this receiver.
task * apply_body_bypass(input_type)
Applies the body to the provided input.
function_input_base< Input, Policy, A, my_class > base_type
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
OutputPortSet output_ports_type
function_body_type * my_init_body
continue_input(graph &g, int number_of_predecessors,)
bool get_item(output_type &v)
aggregated_operation base class
predecessor_cache< input_type, null_mutex > my_predecessors
function_input_queue< input_type, A > input_queue_type
function_input_queue< input_type, A > input_queue_type
continue_input(const continue_input &src)
output_ports_type & output_ports()
function_body_type * my_body
broadcast_cache< output_type > broadcast_cache_type
function_body< input_type, output_type > function_body_type
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
sender< output_type >::successor_type successor_type
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void reset_function_input(reset_flags f)
multifunction_input(const multifunction_input &src)
Copy constructor.
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
static tbb::task *const SUCCESSFULLY_ENQUEUED
function_body_type * my_body
multifunction_input(graph &g, size_t max_concurrency,)
Input and scheduling for a function node that takes a type Input as input.
A::template rebind< input_queue_type >::other queue_allocator_type
void remove_successor(successor_type &r)
bool is_graph_active(graph &g)
internal::aggregating_functor< class_type, operation_type > handler_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void handle_operations(operation_type *op_list)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
A task that calls a node's forward_task function.
multifunction_body_type * my_body
void reset_receiver(reset_flags f) __TBB_override
aggregator< handler_type, operation_type > my_aggregator
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
Body copy_function_object()
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Output output_type
The output type of this receiver.
operation_type(op_type t)
function_input_base(const function_input_base &src)
Copy constructor.
Implements methods for both executable and function nodes that puts Output to its successors.
multifunction_output(const multifunction_output &)
__TBB_STATIC_ASSERT(!((internal::has_policy< queueing, Policy >::value) &&(internal::has_policy< rejecting, Policy >::value)), "queueing and rejecting policies can't be specified simultaneously")
broadcast_cache_type & successors()
Base class for user-defined tasks.
input_queue_type * my_queue
void __TBB_store_with_release(volatile T &location, V value)
bool push_back(item_type &v)
static void fgt_begin_body(void *)
static bool this_empty(P &p)
function_output(const function_output &)
void check_task_and_spawn(graph &g, task *t)
void spawn_forward_task()
Spawns a task that calls forward()
bool pop_front(item_type &v)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
function_input< Input, Output, Policy, A > my_class
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
task * try_put_task(const T &t) __TBB_override
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
task * internal_try_put_bypass(const input_type &t)
Base class for types that should not be assigned.
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
task * execute() __TBB_override
Implements methods for an executable node that takes continue_msg as input.
task * perform_queued_requests()
the leaf for function_body
void set_owner(successor_type *owner)
static void fgt_alias_port(void *, void *, bool)
task * try_put_task(const output_type &i)
void reset_function_input_base(reset_flags f)
virtual broadcast_cache< output_type > & successors()=0
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
function_input(const function_input &src)
Copy constructor.
task * create_body_task(const input_type &input)
allocates a task to apply a body
void const char const char int ITT_FORMAT __itt_group_sync p
graph & graph_reference() __TBB_override
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
function_body_type * my_init_body
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Body copy_function_object()
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
virtual broadcast_cache< output_type > & successors()=0
output_ports_type my_output_ports
A functor that takes an Input and generates an Output.
void register_successor(successor_type &r)
virtual multifunction_body * clone()=0
virtual function_body * clone()=0
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
static void clear_this(P &p)
void set_owner(owner_type *owner)
graph & graph_reference() __TBB_override
static task * emit_this(graph &g, const T &t, P &p)
operation_type(const input_type &e, op_type t)
continue_input< output_type, Policy > class_type
multifunction_body_type * my_init_body
broadcast_cache_type my_successors
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
static task * emit_this(graph &g, const T &t, P &p)
bool buffer_empty() const
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Input input_type
The input type of this receiver.
void reset_receiver(reset_flags f) __TBB_override
receiver< input_type >::predecessor_type predecessor_type