Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_streaming_node.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_flow_graph_streaming_H
22 #define __TBB_flow_graph_streaming_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 #if __TBB_PREVIEW_STREAMING_NODE
29 
30 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
31 
32 namespace internal {
33 
34 template <int N1, int N2>
35 struct port_ref_impl {
36  // "+1" since the port_ref range is a closed interval (includes its endpoints).
37  static const int size = N2 - N1 + 1;
38 };
39 
40 } // internal
41 
42 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
43 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
44 template <int N1, int N2 = N1>
47 };
48 
49 namespace internal {
50 
51 template <typename T>
52 struct num_arguments {
53  static const int value = 1;
54 };
55 
56 template <int N1, int N2>
57 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
58  static const int value = port_ref_impl<N1,N2>::size;
59 };
60 
61 template <int N1, int N2>
62 struct num_arguments<port_ref_impl<N1,N2>> {
63  static const int value = port_ref_impl<N1,N2>::size;
64 };
65 
66 template <typename... Args>
67 void ignore_return_values( Args&&... ) {}
68 
69 template <typename T>
70 T or_return_values( T&& t ) { return t; }
71 template <typename T, typename... Rest>
72 T or_return_values( T&& t, Rest&&... rest ) {
73  return t | or_return_values( std::forward<Rest>(rest)... );
74 }
75 
76 template<typename JP>
78  typedef size_t type;
80 };
81 
82 template<typename Key>
83 struct key_from_policy< key_matching<Key> > {
84  typedef Key type;
86 };
87 
88 template<typename Key>
89 struct key_from_policy< key_matching<Key&> > {
90  typedef const Key &type;
92 };
93 
94 template<typename Device, typename Key>
96  Device my_device;
98 public:
99  // TODO: investigate why default constructor is required
101  streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
102  Key key() const { return my_key; }
103  const Device& device() const { return my_device; }
104 };
105 
106 // --------- Kernel argument helpers --------- //
107 template <typename T>
110 };
111 
112 template <int N1, int N2>
113 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
115 };
116 
117 template <int N1, int N2>
118 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
120 };
121 
122 template <typename T>
123 struct is_port_ref {
125 };
126 
127 template <typename ...Args1>
129 
130 template <typename A1, typename ...Args1>
131 struct convert_and_call_impl<A1, Args1...> {
132  static const size_t my_delta = 1; // Index 0 contains device
133 
134  template <typename F, typename Tuple, typename ...Args2>
135  static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136  convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
137  }
138  template <typename F, typename Tuple, typename ...Args2>
139  static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
140  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
141  }
142  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
143  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
144  convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
145  args2..., std::get<N1 + my_delta>(t));
146  }
147  template <typename F, typename Tuple, int N, typename ...Args2>
148  static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
149  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
150  }
151 
152  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
153  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
154  doit_impl(x, f, t, fn(), args1..., args2...);
155  }
156  template <typename F, typename Tuple, int N, typename ...Args2>
157  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
158  doit_impl(x, f, t, fn(), args1..., args2...);
159  }
160 };
161 
162 template <>
164  template <typename F, typename Tuple, typename ...Args2>
165  static void doit(F& f, Tuple&, Args2&... args2) {
166  f(args2...);
167  }
168 };
169 // ------------------------------------------- //
170 
171 template<typename JP, typename StreamFactory, typename... Ports>
173  // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
174  template <typename T>
175  struct async_msg_type {
176  typedef typename StreamFactory::template async_msg_type<T> type;
177  };
178 
183 
184  // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
186 };
187 
188 // Default empty implementation
189 template<typename StreamFactory, typename KernelInputTuple, typename = void>
191  typedef typename StreamFactory::device_type device_type;
192  typedef typename StreamFactory::kernel_type kernel_type;
193  typedef KernelInputTuple kernel_input_tuple;
194 protected:
195  template <typename ...Args>
196  void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
197  factory.send_kernel( device, kernel, args... );
198  }
199 };
200 
201 // Implementation for StreamFactory supporting range
202 template<typename StreamFactory, typename KernelInputTuple>
203 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
204  typedef typename StreamFactory::device_type device_type;
205  typedef typename StreamFactory::kernel_type kernel_type;
206  typedef KernelInputTuple kernel_input_tuple;
207 
208  typedef typename StreamFactory::range_type range_type;
209 
210  // Container for randge. It can contain either port references or real range.
211  struct range_wrapper {
212  virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
213  virtual range_wrapper *clone() const = 0;
214  virtual ~range_wrapper() {}
215  };
216 
217  struct range_value : public range_wrapper {
218  range_value( const range_type& value ) : my_value(value) {}
219 
220  range_value( range_type&& value ) : my_value(std::move(value)) {}
221 
223  return my_value;
224  }
225 
226  range_wrapper *clone() const __TBB_override {
227  return new range_value(my_value);
228  }
229  private:
231  };
232 
233  template <int N>
234  struct range_mapper : public range_wrapper {
236 
238  // "+1" since get<0>(ip) is StreamFactory::device.
239  return get<N + 1>(ip).data(false);
240  }
241 
242  range_wrapper *clone() const __TBB_override {
243  return new range_mapper<N>;
244  }
245  };
246 
247 protected:
248  template <typename ...Args>
249  void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
250  __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
251  factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
252  }
253 
254 public:
255  kernel_executor_helper() : my_range_wrapper(NULL) {}
256 
257  kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
258 
259  kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
260  // Set moving holder mappers to NULL to prevent double deallocation
261  executor.my_range_wrapper = NULL;
262  }
263 
265  if (my_range_wrapper) delete my_range_wrapper;
266  }
267 
268  void set_range(const range_type& work_size) {
269  my_range_wrapper = new range_value(work_size);
270  }
271 
272  void set_range(range_type&& work_size) {
273  my_range_wrapper = new range_value(std::move(work_size));
274  }
275 
276  template <int N>
278  my_range_wrapper = new range_mapper<N>;
279  }
280 
281  template <int N>
283  my_range_wrapper = new range_mapper<N>;
284  }
285 
286 private:
287  range_wrapper* my_range_wrapper;
288 };
289 
290 } // internal
291 
292 /*
293 /---------------------------------------- streaming_node ------------------------------------\
294 | |
295 | /--------------\ /----------------------\ /-----------\ /----------------------\ |
296 | | | | (device_with_key) O---O | | | |
297 | | | | | | | | | |
298 O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
299 | | | | (multifunction_node) | | | | (multifunction_node) | |
300 O---O | | O---O | | O---O
301 | \--------------/ \----------------------/ \-----------/ \----------------------/ |
302 | |
303 \--------------------------------------------------------------------------------------------/
304 */
305 template<typename... Args>
307 
308 template<typename... Ports, typename JP, typename StreamFactory>
309 class streaming_node< tuple<Ports...>, JP, StreamFactory >
310  : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
311  typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
312  , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
313 {
314  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
315  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
317 protected:
318  typedef typename StreamFactory::device_type device_type;
319  typedef typename StreamFactory::kernel_type kernel_type;
320 private:
322  typedef composite_node<input_tuple, output_tuple> base_type;
323  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
324  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
325 
328 
329  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
330  typedef typename indexer_node_type::output_type indexer_node_output_type;
331  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
332  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
333  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
334 
335  template <int... S>
336  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
337  return std::tie( internal::input_port<S>( my_indexer_node )... );
338  }
339 
340  template <int... S>
341  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
342  return std::tie( internal::output_port<S>( my_kernel_node )... );
343  }
344 
345  typename base_type::input_ports_type get_input_ports() {
346  return get_input_ports( input_sequence() );
347  }
348 
349  typename base_type::output_ports_type get_output_ports() {
350  return get_output_ports( output_sequence() );
351  }
352 
353  template <int N>
355  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
356  return 0;
357  }
358 
359  template <int... S>
361  make_edge( my_indexer_node, my_device_selector_node );
362  make_edge( my_device_selector_node, my_join_node );
363  internal::ignore_return_values( make_Nth_edge<S + 1>()... );
364  make_edge( my_join_node, my_kernel_node );
365  }
366 
367  void make_edges() {
368  make_edges( input_sequence() );
369  }
370 
371  class device_selector_base {
372  public:
373  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
374  virtual device_selector_base *clone( streaming_node &n ) const = 0;
376  };
377 
378  template <typename UserFunctor>
379  class device_selector : public device_selector_base, tbb::internal::no_assign {
380  public:
381  device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
382  : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
383  , my_user_functor( uf ), my_node(n), my_factory( f )
384  {
385  my_port_epoches.fill( 0 );
386  }
387 
388  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
389  (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
391  || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
392  }
393 
394  device_selector_base *clone( streaming_node &n ) const __TBB_override {
395  return new device_selector( my_user_functor, n, my_factory );
396  }
397  private:
398  typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
399  typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
400 
401  template <int... S>
403  dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
404  return dispatch;
405  }
406 
407  template <typename T>
408  key_type get_key( std::false_type, const T &, size_t &epoch ) {
410  return epoch++;
411  }
412 
413  template <typename T>
414  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
416  return key_from_message<key_type>( t );
417  }
418 
419  template <int N>
420  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
421  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
422  elem_type e = internal::cast_to<elem_type>( v );
423  device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
424  my_factory.send_data( device, e );
425  get<N + 1>( op ).try_put( e );
426  }
427 
428  template< typename DevicePort >
429  device_type get_device( key_type key, DevicePort& dp ) {
430  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
431  if ( it == my_devices.end() ) {
432  device_type d = my_user_functor( my_factory );
433  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
434  bool res = dp.try_put( device_with_key_type( d, key ) );
435  __TBB_ASSERT_EX( res, NULL );
436  my_node.notify_new_device( d );
437  }
438  epoch_desc &e = it->second;
439  device_type d = e.my_device;
440  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
441  return d;
442  }
443 
444  struct epoch_desc {
445  epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
448  };
449 
451  std::array<size_t, NUM_INPUTS> my_port_epoches;
453  UserFunctor my_user_functor;
455  StreamFactory &my_factory;
456  };
457 
458  class device_selector_body {
459  public:
460  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
461 
462  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
463  (*my_device_selector)(v, op);
464  }
465  private:
466  device_selector_base *my_device_selector;
467  };
468 
469  class args_storage_base : tbb::internal::no_copy {
470  public:
471  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
472 
473  virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
474  virtual void send( device_type d ) = 0;
475  virtual args_storage_base *clone() const = 0;
476  virtual ~args_storage_base () {}
477 
478  protected:
479  args_storage_base( const kernel_type& kernel, StreamFactory &f )
480  : my_kernel( kernel ), my_factory( f )
481  {}
482 
483  args_storage_base( const args_storage_base &k )
484  : my_kernel( k.my_kernel ), my_factory( k.my_factory )
485  {}
486 
488  StreamFactory &my_factory;
489  };
490 
491  template <typename... Args>
492  class args_storage : public args_storage_base {
494 
495  // ---------- Update events helpers ---------- //
496  template <int N>
497  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
498  const auto& t = get<N + 1>( ip );
499  auto &port = get<N>( op );
500  return port.try_put( t );
501  }
502 
503  template <int... S>
505  return internal::or_return_values( do_try_put<S>( ip, op )... );
506  }
507 
508  // ------------------------------------------- //
509  class run_kernel_func : tbb::internal::no_assign {
510  public:
511  run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
512  : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
513 
514  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
515  // Allow the compiler to deduce types for function pointers automatically.
516  template <typename... FnArgs>
517  void operator()( FnArgs&... args ) {
518  internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
519  }
520  private:
521  struct kernel_func : tbb::internal::no_copy {
524  const args_storage& my_storage;
526 
527  kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
528  : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
529  {}
530 
531  template <typename... FnArgs>
532  void operator()( FnArgs&... args ) {
533  my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
534  }
535  } my_kernel_func;
536  };
537 
538  template<typename FinalizeFn>
539  class run_finalize_func : tbb::internal::no_assign {
540  public:
541  run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
542  : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
543 
544  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
545  // Allow the compiler to deduce types for function pointers automatically.
546  template <typename... FnArgs>
547  void operator()( FnArgs&... args ) {
548  internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
549  }
550  private:
552 
553  struct finalize_func : tbb::internal::no_assign {
554  StreamFactory &my_factory;
556  FinalizeFn my_fn;
557 
558  finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
559  : my_factory(factory), my_device(device), my_fn(fn) {}
560 
561  template <typename... FnArgs>
562  void operator()( FnArgs&... args ) {
563  my_factory.finalize( my_device, my_fn, args... );
564  }
565  } my_finalize_func;
566  };
567 
568  template<typename FinalizeFn>
569  static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
570  return run_finalize_func<FinalizeFn>( ip, factory, fn );
571  }
572 
573  class send_func : tbb::internal::no_assign {
574  public:
575  send_func( StreamFactory &factory, device_type d )
576  : my_factory(factory), my_device( d ) {}
577 
578  template <typename... FnArgs>
579  void operator()( FnArgs&... args ) {
580  my_factory.send_data( my_device, args... );
581  }
582  private:
583  StreamFactory &my_factory;
585  };
586 
587  public:
588  args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
589  : args_storage_base( kernel, f )
590  , my_args_pack( std::forward<Args>(args)... )
591  {}
592 
593  args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
594 
595  args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
596 
598  // Make const qualified args_pack (from non-const)
599  const args_pack_type& const_args_pack = my_args_pack;
600  // factory.enqure_kernel() gets
601  // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
602  // - arguments (from my_args_pack) by const-reference via const_args_pack
603  tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
604 
605  if (! do_try_put( ip, op, input_sequence() ) ) {
606  graph& g = n.my_graph;
607  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
608  g.increment_wait_count();
609 
610  // factory.finalize() gets
611  // - 'ip' tuple elements by reference, so 'ip' might be changed
612  // - arguments (from my_args_pack) by const-reference via const_args_pack
613  tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
614  g.decrement_wait_count();
615  }), const_args_pack );
616  }
617  }
618 
620  // factory.send() gets arguments by reference and updates these arguments with dependencies
621  // (it gets but usually ignores port_ref-s)
622  tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
623  }
624 
625  args_storage_base *clone() const __TBB_override {
626  // Create new args_storage with copying constructor.
627  return new args_storage<Args...>( *this );
628  }
629 
630  private:
633  };
634 
635  // Body for kernel_multifunction_node.
636  class kernel_body : tbb::internal::no_assign {
637  public:
638  kernel_body( const streaming_node &node ) : my_node( node ) {}
639 
641  __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
642  // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
643  my_node.my_args_storage->enqueue( ip, op, my_node );
644  }
645  private:
647  };
648 
650  struct wrap_to_async {
651  typedef T type; // Keep port_ref as it is
652  };
653 
654  template <typename T>
655  struct wrap_to_async<T, std::false_type> {
656  typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
657  };
658 
659  template <typename... Args>
660  args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
661  // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
662  return new args_storage<Args...>(storage, std::forward<Args>(args)...);
663  }
664 
666  my_args_storage->send( d );
667  }
668 
669  template <typename ...Args>
670  void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
671  this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
672  }
673 
674 public:
675  template <typename DeviceSelector>
676  streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
677  : base_type( g )
678  , my_indexer_node( g )
679  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
680  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
681  , my_join_node( g )
682  , my_kernel_node( g, serial, kernel_body( *this ) )
683  // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
684  , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
685  {
686  base_type::set_external_ports( get_input_ports(), get_output_ports() );
687  make_edges();
688  }
689 
691  : base_type( node.my_graph )
692  , my_indexer_node( node.my_indexer_node )
693  , my_device_selector( node.my_device_selector->clone( *this ) )
694  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
695  , my_join_node( node.my_join_node )
696  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
697  , my_args_storage( node.my_args_storage->clone() )
698  {
699  base_type::set_external_ports( get_input_ports(), get_output_ports() );
700  make_edges();
701  }
702 
704  : base_type( node.my_graph )
705  , my_indexer_node( std::move( node.my_indexer_node ) )
706  , my_device_selector( node.my_device_selector->clone(*this) )
707  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
708  , my_join_node( std::move( node.my_join_node ) )
709  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
710  , my_args_storage( node.my_args_storage )
711  {
712  base_type::set_external_ports( get_input_ports(), get_output_ports() );
713  make_edges();
714  // Set moving node mappers to NULL to prevent double deallocation.
715  node.my_args_storage = NULL;
716  }
717 
719  if ( my_args_storage ) delete my_args_storage;
720  if ( my_device_selector ) delete my_device_selector;
721  }
722 
723  template <typename... Args>
724  void set_args( Args&&... args ) {
725  // Copy the base class of args_storage and create new storage for "Args...".
726  args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
727  delete my_args_storage;
728  my_args_storage = new_args_storage;
729  }
730 
731 protected:
732  void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
733 
734 private:
736  device_selector_base *my_device_selector;
738  join_node<kernel_input_tuple, JP> my_join_node;
740 
741  args_storage_base *my_args_storage;
742 };
743 
744 #endif // __TBB_PREVIEW_STREAMING_NODE
745 #endif // __TBB_flow_graph_streaming_H
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
#define __TBB_override
Definition: tbb_stddef.h:244
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
T or_return_values(T &&t)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:536
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s __itt_frame ITT_FORMAT p const char const char ITT_FORMAT s __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
internal::port_ref_impl< N1, N2 > port_ref()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
bool_constant< true > true_type
Definition: tbb_stddef.h:472
device_selector_base * clone(streaming_node &n) const __TBB_override
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3123
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
Detects whether two given types are the same.
K key_from_message(const T &t)
Definition: flow_graph.h:695
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:171
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
task * do_try_put(const T &v, void *p)
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
StreamFactory::template async_msg_type< T > type
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
tuple< typename async_msg_type< Ports >::type... > input_tuple
static void doit(F &f, Tuple &, Args2 &... args2)
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
The graph class.
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
internal::make_sequence< NUM_INPUTS >::type input_sequence
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
composite_node< input_tuple, output_tuple > base_type
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
void ignore_return_values(Args &&...)
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
bool_constant< false > false_type
Definition: tbb_stddef.h:473
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.