Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::pipeline Class Reference

A processing pipeline that applies filters to items. More...

#include <pipeline.h>

Collaboration diagram for tbb::pipeline:

Public Member Functions

__TBB_EXPORTED_METHOD pipeline ()
 Construct empty pipeline. More...
 
virtual __TBB_EXPORTED_METHOD ~pipeline ()
 
void __TBB_EXPORTED_METHOD add_filter (filter &filter_)
 Add filter to end of pipeline. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens)
 Run the pipeline to completion. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens, tbb::task_group_context &context)
 Run the pipeline to completion with user-supplied context. More...
 
void __TBB_EXPORTED_METHOD clear ()
 Remove all filters from the pipeline. More...
 

Private Member Functions

void remove_filter (filter &filter_)
 Remove filter from pipeline. More...
 
void __TBB_EXPORTED_METHOD inject_token (task &self)
 Not used, but retained to satisfy old export files. More...
 
void clear_filters ()
 Does clean up if pipeline is cancelled or exception occurred. More...
 

Private Attributes

filterfilter_list
 Pointer to first filter in the pipeline. More...
 
filterfilter_end
 Pointer to location where address of next filter to be added should be stored. More...
 
taskend_counter
 task who's reference count is used to determine when all stages are done. More...
 
atomic< internal::Tokeninput_tokens
 Number of idle tokens waiting for input stage. More...
 
atomic< internal::Tokentoken_counter
 Global counter of tokens. More...
 
bool end_of_input
 False until fetch_input returns NULL. More...
 
bool has_thread_bound_filters
 True if the pipeline contains a thread-bound filter; false otherwise. More...
 

Friends

class internal::stage_task
 
class internal::pipeline_root_task
 
class filter
 
class thread_bound_filter
 
class internal::pipeline_cleaner
 
class tbb::interface6::internal::pipeline_proxy
 

Detailed Description

A processing pipeline that applies filters to items.

Definition at line 236 of file pipeline.h.

Constructor & Destructor Documentation

◆ pipeline()

tbb::pipeline::pipeline ( )

Construct empty pipeline.

Definition at line 535 of file pipeline.cpp.

535  :
536  filter_list(NULL),
537  filter_end(NULL),
538  end_counter(NULL),
539  end_of_input(false),
541 {
542  token_counter = 0;
543  input_tokens = 0;
544 }
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:274
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
atomic< internal::Token > token_counter
Global counter of tokens.
Definition: pipeline.h:280
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283

References input_tokens, and token_counter.

◆ ~pipeline()

tbb::pipeline::~pipeline ( )
virtual

Though the current implementation declares the destructor virtual, do not rely on this detail. The virtualness is deprecated and may disappear in future versions of TBB.

Definition at line 546 of file pipeline.cpp.

546  {
547  clear();
548 }
void __TBB_EXPORTED_METHOD clear()
Remove all filters from the pipeline.
Definition: pipeline.cpp:550

References clear().

Here is the call graph for this function:

Member Function Documentation

◆ add_filter()

void tbb::pipeline::add_filter ( filter filter_)

Add filter to end of pipeline.

Definition at line 569 of file pipeline.cpp.

569  {
570 #if TBB_USE_ASSERT
571  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
572  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
573  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
574  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
575 #endif
576  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
577  filter_.my_pipeline = this;
578  filter_.prev_filter_in_pipeline = filter_end;
579  if ( filter_list == NULL)
580  filter_list = &filter_;
581  else
583  filter_.next_filter_in_pipeline = NULL;
584  filter_end = &filter_;
585  } else {
586  if( !filter_end )
587  filter_end = reinterpret_cast<filter*>(&filter_list);
588 
589  *reinterpret_cast<filter**>(filter_end) = &filter_;
590  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
591  *reinterpret_cast<filter**>(filter_end) = NULL;
592  }
593  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
594  if( filter_.is_serial() ) {
595  if( filter_.is_bound() )
597  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
598  } else {
599  if(filter_.prev_filter_in_pipeline) {
600  if(filter_.prev_filter_in_pipeline->is_bound()) {
601  // successors to bound filters must have an input_buffer
602  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
603  }
604  } else { // input filter
605  if(filter_.object_may_be_null() ) {
606  //TODO: buffer only needed to hold TLS; could improve
607  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
608  filter_.my_input_buffer->create_my_tls();
609  }
610  }
611  }
612  } else {
613  if( filter_.is_serial() ) {
614  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
615  }
616  }
617 
618 }
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:274
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
static const unsigned char version_mask
Definition: pipeline.h:93

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, has_thread_bound_filters, tbb::filter::is_bound(), tbb::filter::is_ordered(), tbb::filter::is_serial(), tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), tbb::filter::object_may_be_null(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

Here is the call graph for this function:

◆ clear()

void tbb::pipeline::clear ( )

Remove all filters from the pipeline.

Definition at line 550 of file pipeline.cpp.

550  {
551  filter* next;
552  for( filter* f = filter_list; f; f=next ) {
553  if( internal::input_buffer* b = f->my_input_buffer ) {
554  delete b;
555  f->my_input_buffer = NULL;
556  }
557  next=f->next_filter_in_pipeline;
558  f->next_filter_in_pipeline = filter::not_in_pipeline();
559  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
560  f->prev_filter_in_pipeline = filter::not_in_pipeline();
561  f->my_pipeline = NULL;
562  }
563  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
564  f->next_segment = NULL;
565  }
566  filter_list = filter_end = NULL;
567 }
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
friend class filter
Definition: pipeline.h:262
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
static const unsigned char version_mask
Definition: pipeline.h:93

References __TBB_PIPELINE_VERSION, filter_end, filter_list, tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), and tbb::filter::version_mask.

Referenced by ~pipeline().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear_filters()

void tbb::pipeline::clear_filters ( )
private

Does clean up if pipeline is cancelled or exception occurred.

Referenced by tbb::internal::pipeline_cleaner::~pipeline_cleaner().

Here is the caller graph for this function:

◆ inject_token()

void tbb::pipeline::inject_token ( task self)
private

Not used, but retained to satisfy old export files.

Definition at line 521 of file pipeline.cpp.

521  {
522  __TBB_ASSERT(false,"illegal call to inject_token");
523 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169

References __TBB_ASSERT.

◆ remove_filter()

void tbb::pipeline::remove_filter ( filter filter_)
private

Remove filter from pipeline.

Definition at line 620 of file pipeline.cpp.

620  {
621  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
622  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
623  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
624  if (&filter_ == filter_list)
626  else {
627  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
628  filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
629  }
630  if (&filter_ == filter_end)
632  else {
633  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
634  filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
635  }
636  if( internal::input_buffer* b = filter_.my_input_buffer ) {
637  delete b;
638  filter_.my_input_buffer = NULL;
639  }
640  filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
641  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
642  filter_.next_segment = NULL;
643  filter_.my_pipeline = NULL;
644 }
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:274
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:185
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
static const unsigned char version_mask
Definition: pipeline.h:93

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::next_segment, tbb::filter::not_in_pipeline(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

Referenced by tbb::filter::~filter().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ run() [1/2]

void tbb::pipeline::run ( size_t  max_number_of_live_tokens)

Run the pipeline to completion.

Definition at line 646 of file pipeline.cpp.

650  {
651  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
652  __TBB_ASSERT( !end_counter, "pipeline already running?" );
653  if( filter_list ) {
654  internal::pipeline_cleaner my_pipeline_cleaner(*this);
655  end_of_input = false;
656  input_tokens = internal::Token(max_number_of_live_tokens);
658  // release input filter if thread-bound
659  if(filter_list->is_bound()) {
660  filter_list->my_input_buffer->sema_V();
661  }
662  }
663 #if __TBB_TASK_GROUP_CONTEXT
664  end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
665 #else
666  end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
667 #endif
668  // Start execution of tasks
670 
673  if(f->is_bound()) {
674  f->my_input_buffer->sema_V(); // wake to end
675  }
676  }
677  }
678  }
679 }
unsigned long Token
Definition: pipeline.h:44
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:274
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:781
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
friend class filter
Definition: pipeline.h:262
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
friend class internal::pipeline_cleaner
Definition: pipeline.h:264
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:636
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:174
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283

References __TBB_ASSERT, tbb::task::allocate_root(), end_counter, end_of_input, filter_list, has_thread_bound_filters, input_tokens, internal::pipeline_cleaner, internal::pipeline_root_task, tbb::filter::is_bound(), tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, and tbb::task::spawn_root_and_wait().

Here is the call graph for this function:

◆ run() [2/2]

void __TBB_EXPORTED_METHOD tbb::pipeline::run ( size_t  max_number_of_live_tokens,
tbb::task_group_context context 
)

Run the pipeline to completion with user-supplied context.

Friends And Related Function Documentation

◆ filter

friend class filter
friend

Definition at line 262 of file pipeline.h.

◆ internal::pipeline_cleaner

friend class internal::pipeline_cleaner
friend

Definition at line 264 of file pipeline.h.

Referenced by run().

◆ internal::pipeline_root_task

friend class internal::pipeline_root_task
friend

Definition at line 261 of file pipeline.h.

Referenced by run().

◆ internal::stage_task

friend class internal::stage_task
friend

Definition at line 260 of file pipeline.h.

◆ tbb::interface6::internal::pipeline_proxy

Definition at line 265 of file pipeline.h.

◆ thread_bound_filter

friend class thread_bound_filter
friend

Definition at line 263 of file pipeline.h.

Member Data Documentation

◆ end_counter

task* tbb::pipeline::end_counter
private

task who's reference count is used to determine when all stages are done.

Definition at line 274 of file pipeline.h.

Referenced by add_filter(), remove_filter(), run(), and tbb::internal::pipeline_cleaner::~pipeline_cleaner().

◆ end_of_input

bool tbb::pipeline::end_of_input
private

◆ filter_end

filter* tbb::pipeline::filter_end
private

Pointer to location where address of next filter to be added should be stored.

Definition at line 271 of file pipeline.h.

Referenced by add_filter(), clear(), and remove_filter().

◆ filter_list

◆ has_thread_bound_filters

bool tbb::pipeline::has_thread_bound_filters
private

True if the pipeline contains a thread-bound filter; false otherwise.

Definition at line 286 of file pipeline.h.

Referenced by add_filter(), tbb::internal::stage_task::execute(), and run().

◆ input_tokens

atomic<internal::Token> tbb::pipeline::input_tokens
private

Number of idle tokens waiting for input stage.

Definition at line 277 of file pipeline.h.

Referenced by tbb::internal::stage_task::execute(), tbb::internal::pipeline_root_task::execute(), tbb::thread_bound_filter::internal_process_item(), pipeline(), and run().

◆ token_counter


The documentation for this class was generated from the following files:

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.