You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/tbb/pipeline.cpp

782 lines
30 KiB
C++

/*
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
you can redistribute it and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation. Threading Building Blocks is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details. You should have received a copy of
the GNU General Public License along with Threading Building Blocks; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software library without
restriction. Specifically, if other files instantiate templates or use macros or inline
functions from this file, or you compile this file and link it with other files to produce
an executable, this file does not by itself cause the resulting executable to be covered
by the GNU General Public License. This exception does not however invalidate any other
reasons why the executable file might be covered by the GNU General Public License.
*/
#include "tbb/pipeline.h"
#include "tbb/spin_mutex.h"
#include "tbb/cache_aligned_allocator.h"
#include "itt_notify.h"
#include "semaphore.h"
#include "tls.h" // for parallel filters that do not use NULL as end_of_input
namespace tbb {
namespace internal {
//! This structure is used to store task information in a input buffer
struct task_info {
void* my_object;
//! Invalid unless a task went through an ordered stage.
Token my_token;
//! False until my_token is set.
bool my_token_ready;
//! True if my_object is valid.
bool is_valid;
//! Set to initial state (no object, no token)
void reset() {
my_object = NULL;
my_token = 0;
my_token_ready = false;
is_valid = false;
}
};
//! A buffer of input items for a filter.
/** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
class input_buffer : no_copy {
friend class tbb::internal::pipeline_root_task;
friend class tbb::filter;
friend class tbb::thread_bound_filter;
friend class tbb::internal::stage_task;
friend class tbb::pipeline;
typedef Token size_type;
//! Array of deferred tasks that cannot yet start executing.
task_info* array;
//! for thread-bound filter, semaphore for waiting, NULL otherwise.
semaphore* my_sem;
//! Size of array
/** Always 0 or a power of 2 */
size_type array_size;
//! Lowest token that can start executing.
/** All prior Token have already been seen. */
Token low_token;
//! Serializes updates.
spin_mutex array_mutex;
//! Resize "array".
/** Caller is responsible to acquiring a lock on "array_mutex". */
void grow( size_type minimum_size );
//! Initial size for "array"
/** Must be a power of 2 */
static const size_type initial_buffer_size = 4;
//! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
Token high_token;
//! True for ordered filter, false otherwise.
bool is_ordered;
//! True for thread-bound filter, false otherwise.
bool is_bound;
//! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
typedef basic_tls<intptr_t> end_of_input_tls_t;
end_of_input_tls_t end_of_input_tls;
bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
public:
//! Construct empty buffer.
input_buffer( bool is_ordered_, bool is_bound_ ) :
array(NULL), my_sem(NULL), array_size(0),
low_token(0), high_token(0),
is_ordered(is_ordered_), is_bound(is_bound_),
end_of_input_tls_allocated(false) {
grow(initial_buffer_size);
__TBB_ASSERT( array, NULL );
if(is_bound) create_sema(0);
}
//! Destroy the buffer.
~input_buffer() {
__TBB_ASSERT( array, NULL );
cache_aligned_allocator<task_info>().deallocate(array,array_size);
poison_pointer( array );
if(my_sem) {
free_sema();
}
if(end_of_input_tls_allocated) {
destroy_my_tls();
}
}
//! Put a token into the buffer.
/** If task information was placed into buffer, returns true;
otherwise returns false, informing the caller to create and spawn a task.
If input buffer owned by thread-bound filter and the item at
low_token was not valid, issue a V()
If the input_buffer is owned by a successor to a thread-bound filter,
the force_put parameter should be true to ensure the token is inserted
in the buffer.
*/
bool put_token( task_info& info_, bool force_put = false ) {
{
info_.is_valid = true;
spin_mutex::scoped_lock lock( array_mutex );
Token token;
bool was_empty = !array[low_token&(array_size-1)].is_valid;
if( is_ordered ) {
if( !info_.my_token_ready ) {
info_.my_token = high_token++;
info_.my_token_ready = true;
}
token = info_.my_token;
} else
token = high_token++;
__TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
if( token!=low_token || is_bound || force_put ) {
// Trying to put token that is beyond low_token.
// Need to wait until low_token catches up before dispatching.
if( token-low_token>=array_size )
grow( token-low_token+1 );
ITT_NOTIFY( sync_releasing, this );
array[token&(array_size-1)] = info_;
if(was_empty && is_bound) {
sema_V();
}
return true;
}
}
return false;
}
//! Note that processing of a token is finished.
/** Fires up processing of the next token, if processing was deferred. */
// Using template to avoid explicit dependency on stage_task
// this is only called for serial filters, and is the reason for the
// advance parameter in return_item (we're incrementing low_token here.)
// Non-TBF serial stages don't advance the token at the start because the presence
// of the current token in the buffer keeps another stage from being spawned.
template<typename StageTask>
void note_done( Token token, StageTask& spawner ) {
task_info wakee;
wakee.reset();
{
spin_mutex::scoped_lock lock( array_mutex );
if( !is_ordered || token==low_token ) {
// Wake the next task
task_info& item = array[++low_token & (array_size-1)];
ITT_NOTIFY( sync_acquired, this );
wakee = item;
item.is_valid = false;
}
}
if( wakee.is_valid )
spawner.spawn_stage_task(wakee);
}
#if __TBB_TASK_GROUP_CONTEXT
//! The method destroys all data in filters to prevent memory leaks
void clear( filter* my_filter ) {
long t=low_token;
for( size_type i=0; i<array_size; ++i, ++t ){
task_info& temp = array[t&(array_size-1)];
if (temp.is_valid ) {
my_filter->finalize(temp.my_object);
temp.is_valid = false;
}
}
}
#endif
//! return an item, invalidate the queued item, but only advance if advance
// advance == true for parallel filters. If the filter is serial, leave the
// item in the buffer to keep another stage from being spawned.
bool return_item(task_info& info, bool advance) {
spin_mutex::scoped_lock lock( array_mutex );
task_info& item = array[low_token&(array_size-1)];
ITT_NOTIFY( sync_acquired, this );
if( item.is_valid ) {
info = item;
item.is_valid = false;
if (advance) low_token++;
return true;
}
return false;
}
//! true if the current low_token is valid.
bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
// end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
};
void input_buffer::grow( size_type minimum_size ) {
size_type old_size = array_size;
size_type new_size = old_size ? 2*old_size : initial_buffer_size;
while( new_size<minimum_size )
new_size*=2;
task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
task_info* old_array = array;
for( size_type i=0; i<new_size; ++i )
new_array[i].is_valid = false;
long t=low_token;
for( size_type i=0; i<old_size; ++i, ++t )
new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
array = new_array;
array_size = new_size;
if( old_array )
cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
}
class stage_task: public task, public task_info {
private:
friend class tbb::pipeline;
pipeline& my_pipeline;
filter* my_filter;
//! True if this task has not yet read the input.
bool my_at_start;
public:
//! Construct stage_task for first stage in a pipeline.
/** Such a stage has not read any input yet. */
stage_task( pipeline& pipeline ) :
my_pipeline(pipeline),
my_filter(pipeline.filter_list),
my_at_start(true)
{
task_info::reset();
}
//! Construct stage_task for a subsequent stage in a pipeline.
stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
task_info(info),
my_pipeline(pipeline),
my_filter(filter_),
my_at_start(false)
{}
//! Roughly equivalent to the constructor of input stage task
void reset() {
task_info::reset();
my_filter = my_pipeline.filter_list;
my_at_start = true;
}
//! The virtual task execution method
/*override*/ task* execute();
#if __TBB_TASK_GROUP_CONTEXT
~stage_task()
{
if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
__TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
my_filter->finalize(my_object);
my_object = NULL;
}
}
#endif // __TBB_TASK_GROUP_CONTEXT
//! Creates and spawns stage_task from task_info
void spawn_stage_task(const task_info& info)
{
stage_task* clone = new (allocate_additional_child_of(*parent()))
stage_task( my_pipeline, my_filter, info );
spawn(*clone);
}
};
task* stage_task::execute() {
__TBB_ASSERT( !my_at_start || !my_object, NULL );
__TBB_ASSERT( !my_filter->is_bound(), NULL );
if( my_at_start ) {
if( my_filter->is_serial() ) {
my_object = (*my_filter)(my_object);
if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
{
if( my_filter->is_ordered() ) {
my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
my_token_ready = true;
} else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
if( my_pipeline.has_thread_bound_filters )
my_pipeline.token_counter++; // ideally, with relaxed semantics
}
if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
reset();
goto process_another_stage;
} else {
ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
if( --my_pipeline.input_tokens>0 )
spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
}
} else {
my_pipeline.end_of_input = true;
return NULL;
}
} else /*not is_serial*/ {
if( my_pipeline.end_of_input )
return NULL;
if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
if( my_pipeline.has_thread_bound_filters )
my_pipeline.token_counter++;
}
ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
if( --my_pipeline.input_tokens>0 )
spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
my_object = (*my_filter)(my_object);
if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
{
my_pipeline.end_of_input = true;
if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
if( my_pipeline.has_thread_bound_filters )
my_pipeline.token_counter--; // fix token_counter
}
return NULL;
}
}
my_at_start = false;
} else {
my_object = (*my_filter)(my_object);
if( my_filter->is_serial() )
my_filter->my_input_buffer->note_done(my_token, *this);
}
my_filter = my_filter->next_filter_in_pipeline;
if( my_filter ) {
// There is another filter to execute.
if( my_filter->is_serial() ) {
// The next filter must execute tokens in order
if( my_filter->my_input_buffer->put_token(*this) ){
// Can't proceed with the same item
if( my_filter->is_bound() ) {
// Find the next non-thread-bound filter
do {
my_filter = my_filter->next_filter_in_pipeline;
} while( my_filter && my_filter->is_bound() );
// Check if there is an item ready to process
if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
goto process_another_stage;
}
my_filter = NULL; // To prevent deleting my_object twice if exception occurs
return NULL;
}
}
} else {
// Reached end of the pipe.
size_t ntokens_avail = ++my_pipeline.input_tokens;
if(my_pipeline.filter_list->is_bound() ) {
if(ntokens_avail == 1) {
my_pipeline.filter_list->my_input_buffer->sema_V();
}
return NULL;
}
if( ntokens_avail>1 // Only recycle if there is one available token
|| my_pipeline.end_of_input ) {
return NULL; // No need to recycle for new input
}
ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
// Recycle as an input stage task.
reset();
}
process_another_stage:
/* A semi-hackish way to reexecute the same task object immediately without spawning.
recycle_as_continuation marks the task for future execution,
and then 'this' pointer is returned to bypass spawning. */
recycle_as_continuation();
return this;
}
class pipeline_root_task: public task {
pipeline& my_pipeline;
bool do_segment_scanning;
/*override*/ task* execute() {
if( !my_pipeline.end_of_input )
if( !my_pipeline.filter_list->is_bound() )
if( my_pipeline.input_tokens > 0 ) {
recycle_as_continuation();
set_ref_count(1);
return new( allocate_child() ) stage_task( my_pipeline );
}
if( do_segment_scanning ) {
filter* current_filter = my_pipeline.filter_list->next_segment;
/* first non-thread-bound filter that follows thread-bound one
and may have valid items to process */
filter* first_suitable_filter = current_filter;
while( current_filter ) {
__TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
__TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
if( !my_pipeline.end_of_input || current_filter->has_more_work())
{
task_info info;
info.reset();
if( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
set_ref_count(1);
recycle_as_continuation();
return new( allocate_child() ) stage_task( my_pipeline, current_filter, info);
}
current_filter = current_filter->next_segment;
if( !current_filter ) {
if( !my_pipeline.end_of_input ) {
recycle_as_continuation();
return this;
}
current_filter = first_suitable_filter;
__TBB_Yield();
}
} else {
/* The preceding pipeline segment is empty.
Fast-forward to the next post-TBF segment. */
first_suitable_filter = first_suitable_filter->next_segment;
current_filter = first_suitable_filter;
}
} /* while( current_filter ) */
return NULL;
} else {
if( !my_pipeline.end_of_input ) {
recycle_as_continuation();
return this;
}
return NULL;
}
}
public:
pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
{
__TBB_ASSERT( my_pipeline.filter_list, NULL );
filter* first = my_pipeline.filter_list;
if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
// Scanning the pipeline for segments
filter* head_of_previous_segment = first;
for( filter* subfilter=first->next_filter_in_pipeline;
subfilter!=NULL;
subfilter=subfilter->next_filter_in_pipeline )
{
if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
do_segment_scanning = true;
head_of_previous_segment->next_segment = subfilter;
head_of_previous_segment = subfilter;
}
}
}
}
};
#if _MSC_VER && !defined(__INTEL_COMPILER)
// Workaround for overzealous compiler warnings
// Suppress compiler warning about constant conditional expression
#pragma warning (disable: 4127)
#endif
// The class destroys end_counter and clears all input buffers if pipeline was cancelled.
class pipeline_cleaner: internal::no_copy {
pipeline& my_pipeline;
public:
pipeline_cleaner(pipeline& _pipeline) :
my_pipeline(_pipeline)
{}
~pipeline_cleaner(){
#if __TBB_TASK_GROUP_CONTEXT
if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
my_pipeline.clear_filters();
#endif
my_pipeline.end_counter = NULL;
}
};
} // namespace internal
void pipeline::inject_token( task& ) {
__TBB_ASSERT(false,"illegal call to inject_token");
}
#if __TBB_TASK_GROUP_CONTEXT
void pipeline::clear_filters() {
for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
if( internal::input_buffer* b = f->my_input_buffer )
b->clear(f);
}
}
#endif
pipeline::pipeline() :
filter_list(NULL),
filter_end(NULL),
end_counter(NULL),
end_of_input(false),
has_thread_bound_filters(false)
{
token_counter = 0;
input_tokens = 0;
}
pipeline::~pipeline() {
clear();
}
void pipeline::clear() {
filter* next;
for( filter* f = filter_list; f; f=next ) {
if( internal::input_buffer* b = f->my_input_buffer ) {
delete b;
f->my_input_buffer = NULL;
}
next=f->next_filter_in_pipeline;
f->next_filter_in_pipeline = filter::not_in_pipeline();
if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
f->prev_filter_in_pipeline = filter::not_in_pipeline();
f->my_pipeline = NULL;
}
if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
f->next_segment = NULL;
}
filter_list = filter_end = NULL;
}
void pipeline::add_filter( filter& filter_ ) {
#if TBB_USE_ASSERT
if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
__TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
__TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
__TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
#endif
if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
filter_.my_pipeline = this;
filter_.prev_filter_in_pipeline = filter_end;
if ( filter_list == NULL)
filter_list = &filter_;
else
filter_end->next_filter_in_pipeline = &filter_;
filter_.next_filter_in_pipeline = NULL;
filter_end = &filter_;
}
else
{
if( !filter_end )
filter_end = reinterpret_cast<filter*>(&filter_list);
*reinterpret_cast<filter**>(filter_end) = &filter_;
filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
*reinterpret_cast<filter**>(filter_end) = NULL;
}
if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
if( filter_.is_serial() ) {
if( filter_.is_bound() )
has_thread_bound_filters = true;
filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
}
else {
if(filter_.prev_filter_in_pipeline) {
if(filter_.prev_filter_in_pipeline->is_bound()) {
// successors to bound filters must have an input_buffer
filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
}
}
else { // input filter
if(filter_.object_may_be_null() ) {
//TODO: buffer only needed to hold TLS; could improve
filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
filter_.my_input_buffer->create_my_tls();
}
}
}
} else {
if( filter_.is_serial() ) {
filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
}
}
}
void pipeline::remove_filter( filter& filter_ ) {
__TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
__TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
__TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
if (&filter_ == filter_list)
filter_list = filter_.next_filter_in_pipeline;
else {
__TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
}
if (&filter_ == filter_end)
filter_end = filter_.prev_filter_in_pipeline;
else {
__TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
}
if( internal::input_buffer* b = filter_.my_input_buffer ) {
delete b;
filter_.my_input_buffer = NULL;
}
filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
filter_.next_segment = NULL;
filter_.my_pipeline = NULL;
}
void pipeline::run( size_t max_number_of_live_tokens
#if __TBB_TASK_GROUP_CONTEXT
, tbb::task_group_context& context
#endif
) {
__TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
__TBB_ASSERT( !end_counter, "pipeline already running?" );
if( filter_list ) {
internal::pipeline_cleaner my_pipeline_cleaner(*this);
end_of_input = false;
input_tokens = internal::Token(max_number_of_live_tokens);
if(has_thread_bound_filters) {
// release input filter if thread-bound
if(filter_list->is_bound()) {
filter_list->my_input_buffer->sema_V();
}
}
#if __TBB_TASK_GROUP_CONTEXT
end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
#else
end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
#endif
// Start execution of tasks
task::spawn_root_and_wait( *end_counter );
if(has_thread_bound_filters) {
for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
if(f->is_bound()) {
f->my_input_buffer->sema_V(); // wake to end
}
}
}
}
}
#if __TBB_TASK_GROUP_CONTEXT
void pipeline::run( size_t max_number_of_live_tokens ) {
if( filter_list ) {
// Construct task group context with the exception propagation mode expected
// by the pipeline caller.
uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
task_group_context::default_traits :
task_group_context::default_traits & ~task_group_context::exact_exception;
task_group_context context(task_group_context::bound, ctx_traits);
run(max_number_of_live_tokens, context);
}
}
#endif // __TBB_TASK_GROUP_CONTEXT
bool filter::has_more_work() {
__TBB_ASSERT(my_pipeline, NULL);
__TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
}
filter::~filter() {
if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
if ( next_filter_in_pipeline != filter::not_in_pipeline() )
my_pipeline->remove_filter(*this);
else
__TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
} else {
__TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
}
}
void
filter::set_end_of_input() {
__TBB_ASSERT(my_input_buffer, NULL);
__TBB_ASSERT(object_may_be_null(), NULL);
if(is_serial()) {
my_pipeline->end_of_input = true;
}
else {
__TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
my_input_buffer->set_my_tls_end_of_input();
}
}
thread_bound_filter::result_type thread_bound_filter::process_item() {
return internal_process_item(true);
}
thread_bound_filter::result_type thread_bound_filter::try_process_item() {
return internal_process_item(false);
}
thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) {
__TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
internal::task_info info;
info.reset();
if( my_pipeline->end_of_input && !has_more_work() )
return end_of_stream;
if( !prev_filter_in_pipeline ) {
if( my_pipeline->end_of_input )
return end_of_stream;
while( my_pipeline->input_tokens == 0 ) {
if( !is_blocking )
return item_not_available;
my_input_buffer->sema_P();
}
info.my_object = (*this)(info.my_object);
if( info.my_object ) {
__TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
my_pipeline->input_tokens--;
if( is_ordered() ) {
info.my_token = my_pipeline->token_counter;
info.my_token_ready = true;
}
my_pipeline->token_counter++; // ideally, with relaxed semantics
} else {
my_pipeline->end_of_input = true;
return end_of_stream;
}
} else { /* this is not an input filter */
while( !my_input_buffer->has_item() ) {
if( !is_blocking ) {
return item_not_available;
}
my_input_buffer->sema_P();
if( my_pipeline->end_of_input && !has_more_work() ) {
return end_of_stream;
}
}
if( !my_input_buffer->return_item(info, /*advance*/true) ) {
__TBB_ASSERT(false,"return_item failed");
}
info.my_object = (*this)(info.my_object);
}
if( next_filter_in_pipeline ) {
if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
__TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
}
} else {
size_t ntokens_avail = ++(my_pipeline->input_tokens);
if( my_pipeline->filter_list->is_bound() ) {
if( ntokens_avail == 1 ) {
my_pipeline->filter_list->my_input_buffer->sema_V();
}
}
}
return success;
}
} // tbb