You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/tbb/scheduler.cpp

1228 lines
56 KiB
C++

/*
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
you can redistribute it and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation. Threading Building Blocks is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details. You should have received a copy of
the GNU General Public License along with Threading Building Blocks; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software library without
restriction. Specifically, if other files instantiate templates or use macros or inline
functions from this file, or you compile this file and link it with other files to produce
an executable, this file does not by itself cause the resulting executable to be covered
by the GNU General Public License. This exception does not however invalidate any other
reasons why the executable file might be covered by the GNU General Public License.
*/
#include "custom_scheduler.h"
#include "scheduler_utility.h"
#include "governor.h"
#include "market.h"
#include "arena.h"
#include "mailbox.h"
#include "observer_proxy.h"
#include "tbb/tbb_machine.h"
#include "tbb/atomic.h"
namespace tbb {
namespace internal {
//------------------------------------------------------------------------
// Library initialization
//------------------------------------------------------------------------
/** Defined in tbb_main.cpp **/
extern generic_scheduler* (*AllocateSchedulerPtr)( arena*, size_t index );
inline generic_scheduler* allocate_scheduler ( arena* a, size_t index ) {
return AllocateSchedulerPtr(a, index);
}
#if __TBB_TASK_GROUP_CONTEXT
context_state_propagation_mutex_type the_context_state_propagation_mutex;
uintptr_t the_context_state_propagation_epoch = 0;
//! Context to be associated with dummy tasks of worker threads schedulers.
/** It is never used for its direct purpose, and is introduced solely for the sake
of avoiding one extra conditional branch in the end of wait_for_all method. **/
static task_group_context the_dummy_context(task_group_context::isolated);
#endif /* __TBB_TASK_GROUP_CONTEXT */
void Scheduler_OneTimeInitialization ( bool itt_present ) {
AllocateSchedulerPtr = itt_present ? &custom_scheduler<DefaultSchedulerTraits>::allocate_scheduler :
&custom_scheduler<IntelSchedulerTraits>::allocate_scheduler;
#if __TBB_TASK_GROUP_CONTEXT
// There must be no tasks belonging to this fake task group. Mark invalid for the assert
__TBB_ASSERT(!(task_group_context::low_unused_state_bit & (task_group_context::low_unused_state_bit-1)), NULL);
the_dummy_context.my_state = task_group_context::low_unused_state_bit;
#if __TBB_TASK_PRIORITY
// It should never prevent tasks from being passed to execution.
the_dummy_context.my_priority = num_priority_levels - 1;
#endif /* __TBB_TASK_PRIORITY */
#endif /* __TBB_TASK_GROUP_CONTEXT */
}
//------------------------------------------------------------------------
// scheduler interface
//------------------------------------------------------------------------
// A pure virtual destructor should still have a body
// so the one for tbb::internal::scheduler::~scheduler() is provided here
scheduler::~scheduler( ) {}
//------------------------------------------------------------------------
// generic_scheduler
//------------------------------------------------------------------------
#if _MSC_VER && !defined(__INTEL_COMPILER)
// Suppress overzealous compiler warning about using 'this' in base initializer list.
#pragma warning(push)
#pragma warning(disable:4355)
#endif
generic_scheduler::generic_scheduler( arena* a, size_t index )
: my_stealing_threshold(0)
, my_market(NULL)
, my_random( this )
, my_free_list(NULL)
#if __TBB_HOARD_NONLOCAL_TASKS
, my_nonlocal_free_list(NULL)
#endif
, my_dummy_task(NULL)
, my_ref_count(1)
, my_auto_initialized(false)
#if __TBB_COUNT_TASK_NODES
, my_task_node_count(0)
#endif /* __TBB_COUNT_TASK_NODES */
, my_small_task_count(1) // Extra 1 is a guard reference
, my_return_list(NULL)
#if __TBB_TASK_GROUP_CONTEXT
, my_local_ctx_list_update(make_atomic(uintptr_t(0)))
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if __TBB_TASK_PRIORITY
, my_offloaded_tasks(NULL)
, my_offloaded_task_list_tail_link(NULL)
, my_local_reload_epoch(0)
, my_pool_reshuffling_pending(false)
#endif /* __TBB_TASK_PRIORITY */
#if __TBB_TASK_GROUP_CONTEXT
, my_nonlocal_ctx_list_update(make_atomic(uintptr_t(0)))
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if __TBB_SURVIVE_THREAD_SWITCH && TBB_USE_ASSERT
, my_cilk_state(cs_none)
#endif /* __TBB_SURVIVE_THREAD_SWITCH && TBB_USE_ASSERT */
{
my_arena_index = index;
my_arena_slot = 0;
my_arena = a;
my_innermost_running_task = NULL;
my_dispatching_task = NULL;
my_affinity_id = 0;
#if __TBB_SCHEDULER_OBSERVER
my_last_global_observer = NULL;
my_last_local_observer = NULL;
#endif /* __TBB_SCHEDULER_OBSERVER */
#if __TBB_TASK_PRIORITY
my_ref_top_priority = NULL;
my_ref_reload_epoch = NULL;
#endif /* __TBB_TASK_PRIORITY */
my_dummy_task = &allocate_task( sizeof(task), __TBB_CONTEXT_ARG(NULL, NULL) );
#if __TBB_TASK_GROUP_CONTEXT
my_context_list_head.my_prev = &my_context_list_head;
my_context_list_head.my_next = &my_context_list_head;
ITT_SYNC_CREATE(&my_context_list_mutex, SyncType_Scheduler, SyncObj_ContextsList);
#endif /* __TBB_TASK_GROUP_CONTEXT */
my_dummy_task->prefix().ref_count = 2;
ITT_SYNC_CREATE(&my_dummy_task->prefix().ref_count, SyncType_Scheduler, SyncObj_WorkerLifeCycleMgmt);
ITT_SYNC_CREATE(&my_return_list, SyncType_Scheduler, SyncObj_TaskReturnList);
assert_task_pool_valid();
#if __TBB_SURVIVE_THREAD_SWITCH
my_cilk_unwatch_thunk.routine = NULL;
#endif /* __TBB_SURVIVE_THREAD_SWITCH */
}
#if _MSC_VER && !defined(__INTEL_COMPILER)
#pragma warning(pop)
#endif // warning 4355 is back
#if TBB_USE_ASSERT > 1
void generic_scheduler::assert_task_pool_valid() const {
acquire_task_pool();
task** tp = my_arena_slot->task_pool_ptr;
__TBB_ASSERT( my_arena_slot->my_task_pool_size >= min_task_pool_size, NULL );
const size_t H = __TBB_load_relaxed(my_arena_slot->head); // mirror
const size_t T = __TBB_load_relaxed(my_arena_slot->tail); // mirror
__TBB_ASSERT( H <= T, NULL );
for ( size_t i = 0; i < H; ++i )
__TBB_ASSERT( tp[i] == poisoned_ptr, "Task pool corrupted" );
for ( size_t i = H; i < T; ++i ) {
__TBB_ASSERT( (uintptr_t)tp[i] + 1 > 1u, "nil or invalid task pointer in the deque" );
__TBB_ASSERT( tp[i]->prefix().state == task::ready ||
tp[i]->prefix().extra_state == es_task_proxy, "task in the deque has invalid state" );
}
for ( size_t i = T; i < my_arena_slot->my_task_pool_size; ++i )
__TBB_ASSERT( tp[i] == poisoned_ptr, "Task pool corrupted" );
release_task_pool();
}
#endif /* TBB_USE_ASSERT > 1 */
void generic_scheduler::init_stack_info () {
// Stacks are growing top-down. Highest address is called "stack base",
// and the lowest is "stack limit".
__TBB_ASSERT( !my_stealing_threshold, "Stealing threshold has already been calculated" );
size_t stack_size = my_market->worker_stack_size();
#if USE_WINTHREAD
#if defined(_MSC_VER)&&_MSC_VER<1400 && !_WIN64
NT_TIB *pteb = (NT_TIB*)__TBB_machine_get_current_teb();
#else
NT_TIB *pteb = (NT_TIB*)NtCurrentTeb();
#endif
__TBB_ASSERT( &pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB" );
__TBB_ASSERT( stack_size >0, "stack_size not initialized?" );
// When a thread is created with the attribute STACK_SIZE_PARAM_IS_A_RESERVATION, stack limit
// in the TIB points to the committed part of the stack only. This renders the expression
// "(uintptr_t)pteb->StackBase / 2 + (uintptr_t)pteb->StackLimit / 2" virtually useless.
// Thus for worker threads we use the explicit stack size we used while creating them.
// And for master threads we rely on the following fact and assumption:
// - the default stack size of a master thread on Windows is 1M;
// - if it was explicitly set by the application it is at least as large as the size of a worker stack.
if ( is_worker() || stack_size < MByte )
my_stealing_threshold = (uintptr_t)pteb->StackBase - stack_size / 2;
else
my_stealing_threshold = (uintptr_t)pteb->StackBase - MByte / 2;
#else /* USE_PTHREAD */
// There is no portable way to get stack base address in Posix, so we use
// non-portable method (on all modern Linux) or the simplified approach
// based on the common sense assumptions. The most important assumption
// is that the main thread's stack size is not less than that of other threads.
// See also comment 3 at the end of this file
void *stack_base = &stack_size;
#if __linux__ && !__bg__
#if __TBB_ipf
void *rsb_base = __TBB_get_bsp();
#endif
size_t np_stack_size = 0;
void *stack_limit = NULL;
pthread_attr_t np_attr_stack;
if( 0 == pthread_getattr_np(pthread_self(), &np_attr_stack) ) {
if ( 0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size) ) {
#if __TBB_ipf
pthread_attr_t attr_stack;
if ( 0 == pthread_attr_init(&attr_stack) ) {
if ( 0 == pthread_attr_getstacksize(&attr_stack, &stack_size) ) {
if ( np_stack_size < stack_size ) {
// We are in a secondary thread. Use reliable data.
// IA-64 architecture stack is split into RSE backup and memory parts
rsb_base = stack_limit;
stack_size = np_stack_size/2;
// Limit of the memory part of the stack
stack_limit = (char*)stack_limit + stack_size;
}
// We are either in the main thread or this thread stack
// is bigger that that of the main one. As we cannot discern
// these cases we fall back to the default (heuristic) values.
}
pthread_attr_destroy(&attr_stack);
}
// IA-64 architecture stack is split into RSE backup and memory parts
my_rsb_stealing_threshold = (uintptr_t)((char*)rsb_base + stack_size/2);
#endif /* __TBB_ipf */
// Size of the stack free part
stack_size = size_t((char*)stack_base - (char*)stack_limit);
}
pthread_attr_destroy(&np_attr_stack);
}
#endif /* __linux__ */
__TBB_ASSERT( stack_size>0, "stack size must be positive" );
my_stealing_threshold = (uintptr_t)((char*)stack_base - stack_size/2);
#endif /* USE_PTHREAD */
}
#if __TBB_TASK_GROUP_CONTEXT
/** The function uses synchronization scheme similar to the one in the destructor
of task_group_context augmented with interlocked state change of each context
object. The purpose of this algo is to prevent threads doing nonlocal context
destruction from accessing destroyed owner-scheduler instance still pointed to
by the context object. **/
void generic_scheduler::cleanup_local_context_list () {
// Detach contexts remaining in the local list
bool wait_for_concurrent_destroyers_to_leave = false;
uintptr_t local_count_snapshot = my_context_state_propagation_epoch;
my_local_ctx_list_update.store<relaxed>(1);
{
// This is just a definition. Actual lock is acquired only in case of conflict.
spin_mutex::scoped_lock lock;
// Full fence prevents reordering of store to my_local_ctx_list_update with
// load from my_nonlocal_ctx_list_update.
atomic_fence();
// Check for the conflict with concurrent destroyer or cancellation propagator
if ( my_nonlocal_ctx_list_update.load<relaxed>() || local_count_snapshot != the_context_state_propagation_epoch )
lock.acquire(my_context_list_mutex);
// No acquire fence is necessary for loading my_context_list_head.my_next,
// as the list can be updated by this thread only.
context_list_node_t *node = my_context_list_head.my_next;
while ( node != &my_context_list_head ) {
task_group_context &ctx = __TBB_get_object_ref(task_group_context, my_node, node);
__TBB_ASSERT( __TBB_load_relaxed(ctx.my_kind) != task_group_context::binding_required, "Only a context bound to a root task can be detached" );
node = node->my_next;
__TBB_ASSERT( is_alive(ctx.my_version_and_traits), "Walked into a destroyed context while detaching contexts from the local list" );
// Synchronizes with ~task_group_context(). TODO: evaluate and perhaps relax
if ( internal::as_atomic(ctx.my_kind).fetch_and_store(task_group_context::detached) == task_group_context::dying )
wait_for_concurrent_destroyers_to_leave = true;
}
}
my_local_ctx_list_update.store<release>(0);
// Wait until other threads referencing this scheduler object finish with it
if ( wait_for_concurrent_destroyers_to_leave )
spin_wait_until_eq( my_nonlocal_ctx_list_update, 0u );
}
#endif /* __TBB_TASK_GROUP_CONTEXT */
void generic_scheduler::free_scheduler() {
__TBB_ASSERT( !my_arena_slot, NULL );
#if __TBB_TASK_GROUP_CONTEXT
cleanup_local_context_list();
#endif /* __TBB_TASK_GROUP_CONTEXT */
free_task<small_local_task>( *my_dummy_task );
#if __TBB_HOARD_NONLOCAL_TASKS
while( task* t = my_nonlocal_free_list ) {
task_prefix& p = t->prefix();
my_nonlocal_free_list = p.next;
__TBB_ASSERT( p.origin && p.origin!=this, NULL );
free_nonlocal_small_task(*t);
}
#endif
// k accounts for a guard reference and each task that we deallocate.
intptr_t k = 1;
for(;;) {
while( task* t = my_free_list ) {
my_free_list = t->prefix().next;
deallocate_task(*t);
++k;
}
if( my_return_list==plugged_return_list() )
break;
my_free_list = (task*)__TBB_FetchAndStoreW( &my_return_list, (intptr_t)plugged_return_list() );
}
#if __TBB_COUNT_TASK_NODES
my_market->update_task_node_count( my_task_node_count );
#endif /* __TBB_COUNT_TASK_NODES */
// Update my_small_task_count last. Doing so sooner might cause another thread to free *this.
__TBB_ASSERT( my_small_task_count>=k, "my_small_task_count corrupted" );
governor::sign_off(this);
if( __TBB_FetchAndAddW( &my_small_task_count, -k )==k )
NFS_Free( this );
}
task& generic_scheduler::allocate_task( size_t number_of_bytes,
__TBB_CONTEXT_ARG(task* parent, task_group_context* context) ) {
GATHER_STATISTIC(++my_counters.active_tasks);
task *t;
if( number_of_bytes<=quick_task_size ) {
#if __TBB_HOARD_NONLOCAL_TASKS
if( (t = my_nonlocal_free_list) ) {
GATHER_STATISTIC(--my_counters.free_list_length);
__TBB_ASSERT( t->state()==task::freed, "free list of tasks is corrupted" );
my_nonlocal_free_list = t->prefix().next;
} else
#endif
if( (t = my_free_list) ) {
GATHER_STATISTIC(--my_counters.free_list_length);
__TBB_ASSERT( t->state()==task::freed, "free list of tasks is corrupted" );
my_free_list = t->prefix().next;
} else if( my_return_list ) {
// No fence required for read of my_return_list above, because __TBB_FetchAndStoreW has a fence.
t = (task*)__TBB_FetchAndStoreW( &my_return_list, 0 ); // with acquire
__TBB_ASSERT( t, "another thread emptied the my_return_list" );
__TBB_ASSERT( t->prefix().origin==this, "task returned to wrong my_return_list" );
ITT_NOTIFY( sync_acquired, &my_return_list );
my_free_list = t->prefix().next;
} else {
t = (task*)((char*)NFS_Allocate( 1, task_prefix_reservation_size+quick_task_size, NULL ) + task_prefix_reservation_size );
#if __TBB_COUNT_TASK_NODES
++my_task_node_count;
#endif /* __TBB_COUNT_TASK_NODES */
t->prefix().origin = this;
t->prefix().next = 0;
++my_small_task_count;
}
#if __TBB_PREFETCHING
task *t_next = t->prefix().next;
if( !t_next ) { // the task was last in the list
#if __TBB_HOARD_NONLOCAL_TASKS
if( my_free_list )
t_next = my_free_list;
else
#endif
if( my_return_list ) // enable prefetching, gives speedup
t_next = my_free_list = (task*)__TBB_FetchAndStoreW( &my_return_list, 0 );
}
if( t_next ) { // gives speedup for both cache lines
__TBB_cl_prefetch(t_next);
__TBB_cl_prefetch(&t_next->prefix());
}
#endif /* __TBB_PREFETCHING */
} else {
GATHER_STATISTIC(++my_counters.big_tasks);
t = (task*)((char*)NFS_Allocate( 1, task_prefix_reservation_size+number_of_bytes, NULL ) + task_prefix_reservation_size );
#if __TBB_COUNT_TASK_NODES
++my_task_node_count;
#endif /* __TBB_COUNT_TASK_NODES */
t->prefix().origin = NULL;
}
task_prefix& p = t->prefix();
#if __TBB_TASK_GROUP_CONTEXT
p.context = context;
#endif /* __TBB_TASK_GROUP_CONTEXT */
// Obsolete. But still in use, so has to be assigned correct value here.
p.owner = this;
p.ref_count = 0;
// Obsolete. Assign some not outrageously out-of-place value for a while.
p.depth = 0;
p.parent = parent;
// In TBB 2.1 and later, the constructor for task sets extra_state to indicate the version of the tbb/task.h header.
// In TBB 2.0 and earlier, the constructor leaves extra_state as zero.
p.extra_state = 0;
p.affinity = 0;
p.state = task::allocated;
return *t;
}
void generic_scheduler::free_nonlocal_small_task( task& t ) {
__TBB_ASSERT( t.state()==task::freed, NULL );
generic_scheduler& s = *static_cast<generic_scheduler*>(t.prefix().origin);
__TBB_ASSERT( &s!=this, NULL );
for(;;) {
task* old = s.my_return_list;
if( old==plugged_return_list() )
break;
// Atomically insert t at head of s.my_return_list
t.prefix().next = old;
ITT_NOTIFY( sync_releasing, &s.my_return_list );
if( as_atomic(s.my_return_list).compare_and_swap(&t, old )==old ) {
#if __TBB_PREFETCHING
__TBB_cl_evict(&t.prefix());
__TBB_cl_evict(&t);
#endif
return;
}
}
deallocate_task(t);
if( __TBB_FetchAndDecrementWrelease( &s.my_small_task_count )==1 ) {
// We freed the last task allocated by scheduler s, so it's our responsibility
// to free the scheduler.
NFS_Free( &s );
}
}
size_t generic_scheduler::prepare_task_pool ( size_t num_tasks ) {
size_t T = __TBB_load_relaxed(my_arena_slot->tail); // mirror
if ( T + num_tasks <= my_arena_slot->my_task_pool_size )
return T;
acquire_task_pool();
size_t H = __TBB_load_relaxed(my_arena_slot->head); // mirror
T -= H;
size_t new_size = T + num_tasks;
__TBB_ASSERT(!my_arena_slot->my_task_pool_size || my_arena_slot->my_task_pool_size >= min_task_pool_size, NULL);
if( !my_arena_slot->my_task_pool_size ) {
__TBB_ASSERT( !in_arena() && !my_arena_slot->task_pool_ptr, NULL );
if( new_size < min_task_pool_size ) new_size = min_task_pool_size;
my_arena_slot->allocate_task_pool( new_size );
}
// If the free space at the beginning of the task pool is too short, we
// are likely facing a pathological single-producer-multiple-consumers
// scenario, and thus it's better to expand the task pool
else if ( new_size <= my_arena_slot->my_task_pool_size - min_task_pool_size/4 ) {
// Relocate the busy part to the beginning of the deque
memmove( my_arena_slot->task_pool_ptr, my_arena_slot->task_pool_ptr + H, T * sizeof(task*) );
my_arena_slot->fill_with_canary_pattern( T, my_arena_slot->tail );
commit_relocated_tasks(T);
}
else {
// Grow task pool. As this operation is rare, and its cost is asymptotically
// amortizable, we can tolerate new task pool allocation done under the lock.
if ( new_size < 2 * my_arena_slot->my_task_pool_size )
new_size = 2 * my_arena_slot->my_task_pool_size;
task** old_pool = my_arena_slot->task_pool_ptr;
my_arena_slot->allocate_task_pool( new_size ); // updates my_task_pool_size
__TBB_ASSERT( T <= my_arena_slot->my_task_pool_size, "new task pool is too short" );
memcpy( my_arena_slot->task_pool_ptr, old_pool + H, T * sizeof(task*) );
commit_relocated_tasks(T);
__TBB_ASSERT( old_pool, "attempt to free NULL TaskPool" );
NFS_Free( old_pool );
}
assert_task_pool_valid();
return T;
}
/** ATTENTION:
This method is mostly the same as generic_scheduler::lock_task_pool(), with
a little different logic of slot state checks (slot is either locked or points
to our task pool).
Thus if either of them is changed, consider changing the counterpart as well. **/
inline void generic_scheduler::acquire_task_pool() const {
if ( !in_arena() )
return; // we are not in arena - nothing to lock
bool sync_prepare_done = false;
for( atomic_backoff b;;b.pause() ) {
#if TBB_USE_ASSERT
__TBB_ASSERT( my_arena_slot == my_arena->my_slots + my_arena_index, "invalid arena slot index" );
// Local copy of the arena slot task pool pointer is necessary for the next
// assertion to work correctly to exclude asynchronous state transition effect.
task** tp = my_arena_slot->task_pool;
__TBB_ASSERT( tp == LockedTaskPool || tp == my_arena_slot->task_pool_ptr, "slot ownership corrupt?" );
#endif
if( my_arena_slot->task_pool != LockedTaskPool &&
as_atomic(my_arena_slot->task_pool).compare_and_swap(LockedTaskPool, my_arena_slot->task_pool_ptr ) == my_arena_slot->task_pool_ptr )
{
// We acquired our own slot
ITT_NOTIFY(sync_acquired, my_arena_slot);
break;
}
else if( !sync_prepare_done ) {
// Start waiting
ITT_NOTIFY(sync_prepare, my_arena_slot);
sync_prepare_done = true;
}
// Someone else acquired a lock, so pause and do exponential backoff.
}
__TBB_ASSERT( my_arena_slot->task_pool == LockedTaskPool, "not really acquired task pool" );
} // generic_scheduler::acquire_task_pool
inline void generic_scheduler::release_task_pool() const {
if ( !in_arena() )
return; // we are not in arena - nothing to unlock
__TBB_ASSERT( my_arena_slot, "we are not in arena" );
__TBB_ASSERT( my_arena_slot->task_pool == LockedTaskPool, "arena slot is not locked" );
ITT_NOTIFY(sync_releasing, my_arena_slot);
__TBB_store_with_release( my_arena_slot->task_pool, my_arena_slot->task_pool_ptr );
}
/** ATTENTION:
This method is mostly the same as generic_scheduler::acquire_task_pool(),
with a little different logic of slot state checks (slot can be empty, locked
or point to any task pool other than ours, and asynchronous transitions between
all these states are possible).
Thus if any of them is changed, consider changing the counterpart as well **/
inline task** generic_scheduler::lock_task_pool( arena_slot* victim_arena_slot ) const {
task** victim_task_pool;
bool sync_prepare_done = false;
for( atomic_backoff backoff;; /*backoff pause embedded in the loop*/) {
victim_task_pool = victim_arena_slot->task_pool;
// NOTE: Do not use comparison of head and tail indices to check for
// the presence of work in the victim's task pool, as they may give
// incorrect indication because of task pool relocations and resizes.
if ( victim_task_pool == EmptyTaskPool ) {
// The victim thread emptied its task pool - nothing to lock
if( sync_prepare_done )
ITT_NOTIFY(sync_cancel, victim_arena_slot);
break;
}
if( victim_task_pool != LockedTaskPool &&
as_atomic(victim_arena_slot->task_pool).compare_and_swap(LockedTaskPool, victim_task_pool ) == victim_task_pool )
{
// We've locked victim's task pool
ITT_NOTIFY(sync_acquired, victim_arena_slot);
break;
}
else if( !sync_prepare_done ) {
// Start waiting
ITT_NOTIFY(sync_prepare, victim_arena_slot);
sync_prepare_done = true;
}
GATHER_STATISTIC( ++my_counters.thieves_conflicts );
// Someone else acquired a lock, so pause and do exponential backoff.
#if __TBB_STEALING_ABORT_ON_CONTENTION
if(!backoff.bounded_pause()) {
// the 16 was acquired empirically and a theory behind it supposes
// that number of threads becomes much bigger than number of
// tasks which can be spawned by one thread causing excessive contention.
// TODO: However even small arenas can benefit from the abort on contention
// if preemption of a thief is a problem
if(my_arena->my_limit >= 16)
return EmptyTaskPool;
__TBB_Yield();
}
#else
backoff.pause();
#endif
}
__TBB_ASSERT( victim_task_pool == EmptyTaskPool ||
(victim_arena_slot->task_pool == LockedTaskPool && victim_task_pool != LockedTaskPool),
"not really locked victim's task pool?" );
return victim_task_pool;
} // generic_scheduler::lock_task_pool
inline void generic_scheduler::unlock_task_pool( arena_slot* victim_arena_slot,
task** victim_task_pool ) const {
__TBB_ASSERT( victim_arena_slot, "empty victim arena slot pointer" );
__TBB_ASSERT( victim_arena_slot->task_pool == LockedTaskPool, "victim arena slot is not locked" );
ITT_NOTIFY(sync_releasing, victim_arena_slot);
__TBB_store_with_release( victim_arena_slot->task_pool, victim_task_pool );
}
inline task* generic_scheduler::prepare_for_spawning( task* t ) {
__TBB_ASSERT( t->state()==task::allocated, "attempt to spawn task that is not in 'allocated' state" );
t->prefix().state = task::ready;
#if TBB_USE_ASSERT
if( task* parent = t->parent() ) {
internal::reference_count ref_count = parent->prefix().ref_count;
__TBB_ASSERT( ref_count>=0, "attempt to spawn task whose parent has a ref_count<0" );
__TBB_ASSERT( ref_count!=0, "attempt to spawn task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
parent->prefix().extra_state |= es_ref_count_active;
}
#endif /* TBB_USE_ASSERT */
affinity_id dst_thread = t->prefix().affinity;
__TBB_ASSERT( dst_thread == 0 || is_version_3_task(*t),
"backwards compatibility to TBB 2.0 tasks is broken" );
if( dst_thread != 0 && dst_thread != my_affinity_id ) {
task_proxy& proxy = (task_proxy&)allocate_task( sizeof(task_proxy),
__TBB_CONTEXT_ARG(NULL, NULL) );
// Mark as a proxy
proxy.prefix().extra_state = es_task_proxy;
proxy.outbox = &my_arena->mailbox(dst_thread);
// Mark proxy as present in both locations (sender's task pool and destination mailbox)
proxy.task_and_tag = intptr_t(t) | task_proxy::location_mask;
#if __TBB_TASK_PRIORITY
proxy.prefix().context = t->prefix().context;
#endif /* __TBB_TASK_PRIORITY */
ITT_NOTIFY( sync_releasing, proxy.outbox );
// Mail the proxy - after this point t may be destroyed by another thread at any moment.
proxy.outbox->push(proxy);
return &proxy;
}
return t;
}
/** Conceptually, this method should be a member of class scheduler.
But doing so would force us to publish class scheduler in the headers. */
void generic_scheduler::local_spawn( task& first, task*& next ) {
__TBB_ASSERT( governor::is_set(this), NULL );
if ( &first.prefix().next == &next ) {
// Single task is being spawned
size_t T = prepare_task_pool( 1 );
my_arena_slot->task_pool_ptr[T] = prepare_for_spawning( &first );
commit_spawned_tasks( T + 1 );
}
else {
// Task list is being spawned
task *arr[min_task_pool_size];
fast_reverse_vector<task*> tasks(arr, min_task_pool_size);
task *t_next = NULL;
for( task* t = &first; ; t = t_next ) {
// If t is affinitized to another thread, it may already be executed
// and destroyed by the time prepare_for_spawning returns.
// So milk it while it is alive.
bool end = &t->prefix().next == &next;
t_next = t->prefix().next;
tasks.push_back( prepare_for_spawning(t) );
if( end )
break;
}
size_t num_tasks = tasks.size();
size_t T = prepare_task_pool( num_tasks );
tasks.copy_memory( my_arena_slot->task_pool_ptr + T );
commit_spawned_tasks( T + num_tasks );
}
if ( !in_arena() )
enter_arena();
my_arena->advertise_new_work</*Spawned=*/true>();
assert_task_pool_valid();
}
void generic_scheduler::local_spawn_root_and_wait( task& first, task*& next ) {
__TBB_ASSERT( governor::is_set(this), NULL );
__TBB_ASSERT( &first, NULL );
auto_empty_task dummy( __TBB_CONTEXT_ARG(this, first.prefix().context) );
internal::reference_count n = 0;
for( task* t=&first; ; t=t->prefix().next ) {
++n;
__TBB_ASSERT( !t->prefix().parent, "not a root task, or already running" );
t->prefix().parent = &dummy;
if( &t->prefix().next==&next ) break;
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT( t->prefix().context == t->prefix().next->prefix().context,
"all the root tasks in list must share the same context");
#endif /* __TBB_TASK_GROUP_CONTEXT */
}
dummy.prefix().ref_count = n+1;
if( n>1 )
local_spawn( *first.prefix().next, next );
local_wait_for_all( dummy, &first );
}
void tbb::internal::generic_scheduler::spawn( task& first, task*& next ) {
governor::local_scheduler()->local_spawn( first, next );
}
void tbb::internal::generic_scheduler::spawn_root_and_wait( task& first, task*& next ) {
governor::local_scheduler()->local_spawn_root_and_wait( first, next );
}
void tbb::internal::generic_scheduler::enqueue( task& t, void* prio ) {
generic_scheduler *s = governor::local_scheduler();
// these redirections are due to bw-compatibility, consider reworking some day
__TBB_ASSERT( s->my_arena, "thread is not in any arena" );
s->my_arena->enqueue_task(t, (intptr_t)prio, s->my_random );
}
#if __TBB_TASK_PRIORITY
class auto_indicator : no_copy {
volatile bool& my_indicator;
public:
auto_indicator ( volatile bool& indicator ) : my_indicator(indicator) { my_indicator = true ;}
~auto_indicator () { my_indicator = false; }
};
task* generic_scheduler::winnow_task_pool () {
GATHER_STATISTIC( ++my_counters.prio_winnowings );
__TBB_ASSERT( in_arena(), NULL );
__TBB_ASSERT( my_offloaded_tasks, "At least one task is expected to be already offloaded" );
// To eliminate possible sinking of the store to the indicator below the subsequent
// store to my_arena_slot->tail, the stores should have either been separated
// by full fence or both use release fences. And resetting indicator should have
// been done with release fence. But since this is just an optimization, and
// the corresponding checking sequence in arena::is_out_of_work() is not atomic
// anyway, fences aren't used, so that not to penalize warmer path.
auto_indicator indicator(my_pool_reshuffling_pending);
// The purpose of the synchronization algorithm here is for the owner thread
// to avoid locking task pool most of the time.
size_t T0 = __TBB_load_relaxed(my_arena_slot->tail);
__TBB_store_relaxed( my_arena_slot->tail, __TBB_load_relaxed(my_arena_slot->head) - 1 );
atomic_fence();
size_t H = __TBB_load_relaxed(my_arena_slot->head);
size_t T = __TBB_load_relaxed(my_arena_slot->tail);
__TBB_ASSERT( (intptr_t)T <= (intptr_t)T0, NULL);
__TBB_ASSERT( (intptr_t)H >= (intptr_t)T || (H == T0 && T == T0), NULL );
bool acquired = false;
if ( H == T ) {
// Either no contention with thieves during arbitration protocol execution or ...
if ( H >= T0 ) {
// ... the task pool got empty
reset_deque_and_leave_arena( /*locked=*/false );
return NULL;
}
}
else {
// Contention with thieves detected. Now without taking lock it is impossible
// to define the current head value because of its jitter caused by continuing
// stealing attempts (the pool is not locked so far).
acquired = true;
acquire_task_pool();
H = __TBB_load_relaxed(my_arena_slot->head);
if ( H >= T0 ) {
reset_deque_and_leave_arena( /*locked=*/true );
return NULL;
}
}
size_t src,
dst = T0;
// Find the first task to offload.
for ( src = H; src < T0; ++src ) {
task &t = *my_arena_slot->task_pool_ptr[src];
intptr_t p = priority(t);
if ( p < *my_ref_top_priority ) {
// Position of the first offloaded task will be the starting point
// for relocation of subsequent tasks that survive winnowing.
dst = src;
offload_task( t, p );
break;
}
}
for ( ++src; src < T0; ++src ) {
task &t = *my_arena_slot->task_pool_ptr[src];
intptr_t p = priority(t);
if ( p < *my_ref_top_priority )
offload_task( t, p );
else
my_arena_slot->task_pool_ptr[dst++] = &t;
}
__TBB_ASSERT( T0 >= dst, NULL );
task *t = H < dst ? my_arena_slot->task_pool_ptr[--dst] : NULL;
if ( H == dst ) {
// No tasks remain the primary pool
reset_deque_and_leave_arena( acquired );
}
else if ( acquired ) {
__TBB_ASSERT( !is_poisoned(my_arena_slot->task_pool_ptr[H]), NULL );
__TBB_store_relaxed( my_arena_slot->tail, dst );
release_task_pool();
}
else {
__TBB_ASSERT( !is_poisoned(my_arena_slot->task_pool_ptr[H]), NULL );
// Release fence is necessary to make sure possibly relocated task pointers
// become visible to potential thieves
__TBB_store_with_release( my_arena_slot->tail, dst );
}
my_arena_slot->fill_with_canary_pattern( dst, T0 );
assert_task_pool_valid();
return t;
}
task* generic_scheduler::reload_tasks ( task*& offloaded_tasks, task**& offloaded_task_list_link, intptr_t top_priority ) {
GATHER_STATISTIC( ++my_counters.prio_reloads );
__TBB_ASSERT( !in_arena(), NULL );
task *arr[min_task_pool_size];
fast_reverse_vector<task*> tasks(arr, min_task_pool_size);
task **link = &offloaded_tasks;
task *t;
while ( (t = *link) ) {
task** next_ptr = &t->prefix().next_offloaded;
if ( priority(*t) >= top_priority ) {
tasks.push_back( t );
// Note that owner is an alias of next_offloaded. Thus the following
// assignment overwrites *next_ptr
task* next = *next_ptr;
t->prefix().owner = this;
__TBB_ASSERT( t->prefix().state == task::ready || t->prefix().extra_state == es_task_proxy, NULL );
*link = next;
}
else {
link = next_ptr;
}
}
if ( link == &offloaded_tasks ) {
offloaded_tasks = NULL;
#if TBB_USE_ASSERT
offloaded_task_list_link = NULL;
#endif /* TBB_USE_ASSERT */
}
else {
__TBB_ASSERT( link, NULL );
// Mark end of list
*link = NULL;
offloaded_task_list_link = link;
}
__TBB_ASSERT( link, NULL );
size_t num_tasks = tasks.size();
if ( num_tasks ) {
GATHER_STATISTIC( ++my_counters.prio_tasks_reloaded );
size_t T = prepare_task_pool( num_tasks );
tasks.copy_memory( my_arena_slot->task_pool_ptr + T );
if ( --num_tasks ) {
commit_spawned_tasks( T += num_tasks );
enter_arena();
my_arena->advertise_new_work</*Spawned=*/true>();
}
__TBB_ASSERT( T == __TBB_load_relaxed(my_arena_slot->tail), NULL );
__TBB_ASSERT( T < my_arena_slot->my_task_pool_size, NULL );
t = my_arena_slot->task_pool_ptr[T];
poison_pointer(my_arena_slot->task_pool_ptr[T]);
assert_task_pool_valid();
}
return t;
}
task* generic_scheduler::reload_tasks () {
uintptr_t reload_epoch = *my_ref_reload_epoch;
__TBB_ASSERT( my_offloaded_tasks, NULL );
__TBB_ASSERT( my_local_reload_epoch <= reload_epoch
|| my_local_reload_epoch - reload_epoch > uintptr_t(-1)/2,
"Reload epoch counter overflow?" );
if ( my_local_reload_epoch == reload_epoch )
return NULL;
__TBB_ASSERT( my_offloaded_tasks, NULL );
intptr_t top_priority = effective_reference_priority();
__TBB_ASSERT( (uintptr_t)top_priority < (uintptr_t)num_priority_levels, NULL );
task *t = reload_tasks( my_offloaded_tasks, my_offloaded_task_list_tail_link, top_priority );
if ( my_offloaded_tasks && (my_arena->my_bottom_priority >= top_priority || !my_arena->my_num_workers_requested) ) {
// Safeguard against deliberately relaxed synchronization while checking
// for the presence of work in arena (so that not to impact hot paths).
// Arena may be reset to empty state when offloaded low priority tasks
// are still present. This results in both bottom and top priority bounds
// becoming 'normal', which makes offloaded low priority tasks unreachable.
// Update arena's bottom priority to accommodate them.
// First indicate the presence of lower-priority tasks
my_market->update_arena_priority( *my_arena, priority(*my_offloaded_tasks) );
// Then mark arena as full to unlock arena priority level adjustment
// by arena::is_out_of_work(), and ensure worker's presence
my_arena->advertise_new_work</*Spawned=*/false>();
}
my_local_reload_epoch = reload_epoch;
return t;
}
#endif /* __TBB_TASK_PRIORITY */
inline task* generic_scheduler::get_task() {
__TBB_ASSERT( in_arena(), NULL );
task* result = NULL;
size_t T = __TBB_load_relaxed(my_arena_slot->tail); // mirror
retry:
__TBB_store_relaxed(my_arena_slot->tail, --T);
atomic_fence();
if ( (intptr_t)__TBB_load_relaxed(my_arena_slot->head) > (intptr_t)T ) {
acquire_task_pool();
size_t H = __TBB_load_relaxed(my_arena_slot->head); // mirror
if ( (intptr_t)H <= (intptr_t)T ) {
// The thief backed off - grab the task
result = my_arena_slot->task_pool_ptr[T];
__TBB_ASSERT( !is_poisoned(result), NULL );
poison_pointer( my_arena_slot->task_pool_ptr[T] );
}
else {
__TBB_ASSERT ( H == __TBB_load_relaxed(my_arena_slot->head)
&& T == __TBB_load_relaxed(my_arena_slot->tail)
&& H == T + 1, "victim/thief arbitration algorithm failure" );
}
if ( (intptr_t)H < (intptr_t)T )
release_task_pool();
else
reset_deque_and_leave_arena( /*locked=*/true );
}
else {
__TBB_control_consistency_helper(); // on my_arena_slot->head
result = my_arena_slot->task_pool_ptr[T];
__TBB_ASSERT( !is_poisoned(result), NULL );
poison_pointer( my_arena_slot->task_pool_ptr[T] );
}
if( result && is_proxy(*result) ) {
task_proxy &tp = *(task_proxy*)result;
result = tp.extract_task<task_proxy::pool_bit>();
if( !result ) {
// Proxy was empty, so it's our responsibility to free it
free_task<small_task>(tp);
if ( in_arena() )
goto retry;
__TBB_ASSERT( is_quiescent_local_task_pool_reset(), NULL );
return NULL;
}
GATHER_STATISTIC( ++my_counters.proxies_executed );
// Following assertion should be true because TBB 2.0 tasks never specify affinity, and hence are not proxied.
__TBB_ASSERT( is_version_3_task(*result), "backwards compatibility with TBB 2.0 broken" );
// Task affinity has changed.
my_innermost_running_task = result;
result->note_affinity(my_affinity_id);
}
__TBB_ASSERT( result || is_quiescent_local_task_pool_reset(), NULL );
return result;
} // generic_scheduler::get_task
task* generic_scheduler::steal_task( arena_slot& victim_slot ) {
task** victim_pool = lock_task_pool( &victim_slot );
if ( !victim_pool )
return NULL;
task* result = NULL;
size_t H = __TBB_load_relaxed(victim_slot.head); // mirror
const size_t H0 = H;
int skip_and_bump = 0; // +1 for skipped task and +1 for bumped head&tail
retry:
__TBB_store_relaxed( victim_slot.head, ++H );
atomic_fence();
if ( (intptr_t)H > (intptr_t)__TBB_load_relaxed(victim_slot.tail) ) {
// Stealing attempt failed, deque contents has not been changed by us
GATHER_STATISTIC( ++my_counters.thief_backoffs );
__TBB_store_relaxed( victim_slot.head, /*dead: H = */ H0 );
skip_and_bump++; // trigger that we bumped head and tail
__TBB_ASSERT ( !result, NULL );
}
else {
__TBB_control_consistency_helper(); // on victim_slot.tail
result = victim_pool[H-1];
__TBB_ASSERT( !is_poisoned(result), NULL );
if( is_proxy(*result) ) {
task_proxy& tp = *static_cast<task_proxy*>(result);
// If mailed task is likely to be grabbed by its destination thread, skip it.
if ( task_proxy::is_shared(tp.task_and_tag) && tp.outbox->recipient_is_idle() )
{
GATHER_STATISTIC( ++my_counters.proxies_bypassed );
result = NULL;
__TBB_ASSERT( skip_and_bump < 2, NULL );
skip_and_bump = 1; // note we skipped a task
goto retry;
}
}
__TBB_ASSERT( result, NULL );
// emit "task was consumed" signal
ITT_NOTIFY(sync_acquired, (void*)((uintptr_t)&victim_slot+sizeof(uintptr_t)));
const size_t H1 = H0 + 1;
if ( H1 < H ) {
// Some proxies in the task pool have been bypassed. Need to close
// the hole left by the stolen task. The following variant:
// victim_pool[H-1] = victim_pool[H0];
// is of constant time, but creates a potential for degrading stealing
// mechanism efficiency and growing owner's stack size too much because
// of moving earlier split off (and thus larger) chunks closer to owner's
// end of the deque (tail).
// So we use linear time variant that is likely to be amortized to be
// near-constant time, though, and preserves stealing efficiency premises.
// These changes in the deque must be released to the owner.
memmove( victim_pool + H1, victim_pool + H0, (H - H1) * sizeof(task*) );
__TBB_store_with_release( victim_slot.head, /*dead: H = */ H1 );
if ( (intptr_t)H >= (intptr_t)__TBB_load_relaxed(victim_slot.tail) )
skip_and_bump++; // trigger that we bumped head and tail
}
poison_pointer( victim_pool[H0] );
}
unlock_task_pool( &victim_slot, victim_pool );
__TBB_ASSERT( skip_and_bump <= 2, NULL );
#if __TBB_PREFETCHING
__TBB_cl_evict(&victim_slot.head);
__TBB_cl_evict(&victim_slot.tail);
#endif
if( --skip_and_bump > 0 ) { // if both: task skipped and head&tail bumped
// Synchronize with snapshot as we bumped head and tail which can falsely trigger EMPTY state
atomic_fence();
my_arena->advertise_new_work</*Spawned=*/true>();
}
return result;
}
task* generic_scheduler::get_mailbox_task() {
__TBB_ASSERT( my_affinity_id>0, "not in arena" );
while ( task_proxy* const tp = my_inbox.pop() ) {
if ( task* result = tp->extract_task<task_proxy::mailbox_bit>() ) {
ITT_NOTIFY( sync_acquired, my_inbox.outbox() );
result->prefix().extra_state |= es_task_is_stolen;
return result;
}
// We have exclusive access to the proxy, and can destroy it.
free_task<no_cache_small_task>(*tp);
}
return NULL;
}
// TODO: Rename to publish_task_pool
void generic_scheduler::enter_arena() {
__TBB_ASSERT ( my_arena, "no arena: initialization not completed?" );
__TBB_ASSERT ( my_arena_index < my_arena->my_num_slots, "arena slot index is out-of-bound" );
__TBB_ASSERT ( my_arena_slot == &my_arena->my_slots[my_arena_index], NULL);
__TBB_ASSERT ( my_arena_slot->task_pool == EmptyTaskPool, "someone else grabbed my arena slot?" );
__TBB_ASSERT ( __TBB_load_relaxed(my_arena_slot->head) < __TBB_load_relaxed(my_arena_slot->tail),
"entering arena without tasks to share" );
// Release signal on behalf of previously spawned tasks (when this thread was not in arena yet)
ITT_NOTIFY(sync_releasing, my_arena_slot);
__TBB_store_with_release( my_arena_slot->task_pool, my_arena_slot->task_pool_ptr );
}
void generic_scheduler::leave_arena() {
__TBB_ASSERT( in_arena(), "Not in arena" );
// Do not reset my_arena_index. It will be used to (attempt to) re-acquire the slot next time
__TBB_ASSERT( &my_arena->my_slots[my_arena_index] == my_arena_slot, "arena slot and slot index mismatch" );
__TBB_ASSERT ( my_arena_slot->task_pool == LockedTaskPool, "Task pool must be locked when leaving arena" );
__TBB_ASSERT ( is_quiescent_local_task_pool_empty(), "Cannot leave arena when the task pool is not empty" );
ITT_NOTIFY(sync_releasing, &my_arena->my_slots[my_arena_index]);
// No release fence is necessary here as this assignment precludes external
// accesses to the local task pool when becomes visible. Thus it is harmless
// if it gets hoisted above preceding local bookkeeping manipulations.
__TBB_store_relaxed( my_arena_slot->task_pool, EmptyTaskPool );
}
generic_scheduler* generic_scheduler::create_worker( market& m, size_t index ) {
generic_scheduler* s = allocate_scheduler( NULL, index ); // index is not a real slot in arena
#if __TBB_TASK_GROUP_CONTEXT
s->my_dummy_task->prefix().context = &the_dummy_context;
// Sync up the local cancellation state with the global one. No need for fence here.
s->my_context_state_propagation_epoch = the_context_state_propagation_epoch;
#endif /* __TBB_TASK_GROUP_CONTEXT */
s->my_market = &m;
s->init_stack_info();
#if __TBB_TASK_PRIORITY
s->my_ref_top_priority = &s->my_market->my_global_top_priority;
s->my_ref_reload_epoch = &s->my_market->my_global_reload_epoch;
#endif /* __TBB_TASK_PRIORITY */
return s;
}
// TODO: make it a member method
generic_scheduler* generic_scheduler::create_master( arena& a ) {
generic_scheduler* s = allocate_scheduler( &a, 0 /*Master thread always occupies the first slot*/ );
task& t = *s->my_dummy_task;
s->my_innermost_running_task = &t;
s->my_dispatching_task = &t;
t.prefix().ref_count = 1;
governor::sign_on(s);
__TBB_ASSERT( &task::self()==&t, "governor::sign_on failed?" );
#if __TBB_TASK_GROUP_CONTEXT
// Context to be used by root tasks by default (if the user has not specified one).
// Allocation is done by NFS allocator because we cannot reuse memory allocated
// for task objects since the free list is empty at the moment.
t.prefix().context = a.my_default_ctx;
#endif /* __TBB_TASK_GROUP_CONTEXT */
s->my_market = a.my_market;
__TBB_ASSERT( s->my_arena_index == 0, "Master thread must occupy the first slot in its arena" );
s->attach_mailbox(1);
s->my_arena_slot = a.my_slots + 0;
s->my_arena_slot->my_scheduler = s;
#if _WIN32||_WIN64
__TBB_ASSERT( s->my_market, NULL );
s->my_market->register_master( s->master_exec_resource );
#endif /* _WIN32||_WIN64 */
s->init_stack_info();
#if __TBB_TASK_GROUP_CONTEXT
// Sync up the local cancellation state with the global one. No need for fence here.
s->my_context_state_propagation_epoch = the_context_state_propagation_epoch;
#endif
#if __TBB_TASK_PRIORITY
// In the current implementation master threads continue processing even when
// there are other masters with higher priority. Only TBB worker threads are
// redistributed between arenas based on the latters' priority. Thus master
// threads use arena's top priority as a reference point (in contrast to workers
// that use my_market->my_global_top_priority).
s->my_ref_top_priority = &s->my_arena->my_top_priority;
s->my_ref_reload_epoch = &s->my_arena->my_reload_epoch;
#endif /* __TBB_TASK_PRIORITY */
#if __TBB_SCHEDULER_OBSERVER
// Process any existing observers.
__TBB_ASSERT( a.my_observers.empty(), "Just created arena cannot have any observers associated with it" );
the_global_observer_list.notify_entry_observers( s->my_last_global_observer, /*worker=*/false );
#endif /* __TBB_SCHEDULER_OBSERVER */
return s;
}
void generic_scheduler::cleanup_worker( void* arg, bool worker ) {
generic_scheduler& s = *(generic_scheduler*)arg;
__TBB_ASSERT( !s.my_arena_slot, "cleaning up attached worker" );
#if __TBB_SCHEDULER_OBSERVER
if ( worker ) // can be called by master for worker, do not notify master twice
the_global_observer_list.notify_exit_observers( s.my_last_global_observer, /*worker=*/true );
#endif /* __TBB_SCHEDULER_OBSERVER */
s.free_scheduler();
}
void generic_scheduler::cleanup_master() {
generic_scheduler& s = *this; // for similarity with cleanup_worker
__TBB_ASSERT( s.my_arena_slot, NULL);
#if __TBB_SCHEDULER_OBSERVER
s.my_arena->my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/false );
the_global_observer_list.notify_exit_observers( s.my_last_global_observer, /*worker=*/false );
#endif /* __TBB_SCHEDULER_OBSERVER */
if( in_arena() ) {
acquire_task_pool();
if ( my_arena_slot->task_pool == EmptyTaskPool ||
__TBB_load_relaxed(my_arena_slot->head) >= __TBB_load_relaxed(my_arena_slot->tail) )
{
// Local task pool is empty
leave_arena();
}
else {
// Master's local task pool may e.g. contain proxies of affinitized tasks.
release_task_pool();
__TBB_ASSERT ( governor::is_set(this), "TLS slot is cleared before the task pool cleanup" );
s.local_wait_for_all( *s.my_dummy_task, NULL );
__TBB_ASSERT( !in_arena(), NULL );
__TBB_ASSERT ( governor::is_set(this), "Other thread reused our TLS key during the task pool cleanup" );
}
}
__TBB_ASSERT( s.my_market, NULL );
market *my_market = s.my_market;
#if _WIN32||_WIN64
s.my_market->unregister_master( s.master_exec_resource );
#endif /* _WIN32||_WIN64 */
arena* a = s.my_arena;
__TBB_ASSERT(a->my_slots+0 == my_arena_slot, NULL);
#if __TBB_STATISTICS
*my_arena_slot->my_counters += s.my_counters;
#endif /* __TBB_STATISTICS */
#if __TBB_TASK_PRIORITY
__TBB_ASSERT( my_arena_slot->my_scheduler, NULL );
// Master's scheduler may be locked by a worker taking arena snapshot or by
// a thread propagating task group state change across the context tree.
while ( as_atomic(my_arena_slot->my_scheduler).compare_and_swap(NULL, this) != this )
__TBB_Yield();
__TBB_ASSERT( !my_arena_slot->my_scheduler, NULL );
#else /* !__TBB_TASK_PRIORITY */
__TBB_store_with_release(my_arena_slot->my_scheduler, (generic_scheduler*)NULL);
#endif /* __TBB_TASK_PRIORITY */
my_arena_slot = NULL; // detached from slot
s.free_scheduler();
// Resetting arena to EMPTY state (as earlier TBB versions did) should not be
// done here (or anywhere else in the master thread to that matter) because
// after introducing arena-per-master logic and fire-and-forget tasks doing
// so can result either in arena's premature destruction (at least without
// additional costly checks in workers) or in unnecessary arena state changes
// (and ensuing workers migration).
#if __TBB_STATISTICS_EARLY_DUMP
GATHER_STATISTIC( a->dump_arena_statistics() );
#endif
if (governor::needsWaitWorkers())
my_market->prepare_wait_workers();
a->on_thread_leaving</*is_master*/true>();
if (governor::needsWaitWorkers())
my_market->wait_workers();
}
} // namespace internal
} // namespace tbb
/*
Comments:
1. The premise of the cancellation support implementation is that cancellations are
not part of the hot path of the program execution. Therefore all changes in its
implementation in order to reduce the overhead of the cancellation control flow
should be done only in ways that do not increase overhead of the normal execution.
In general contexts are used by all threads and their descendants are created in
different threads as well. In order to minimize impact of the cross-thread tree
maintenance (first of all because of the synchronization), the tree of contexts
is split into pieces, each of which is handled by the only thread. Such pieces
are represented as lists of contexts, members of which are contexts that were
bound to their parents in the given thread.
The context tree maintenance and cancellation propagation algorithms is designed
in such a manner that cross-thread access to a context list will take place only
when cancellation signal is sent (by user or when an exception happens), and
synchronization is necessary only then. Thus the normal execution flow (without
exceptions and cancellation) remains free from any synchronization done on
behalf of exception handling and cancellation support.
2. Consider parallel cancellations at the different levels of the context tree:
Ctx1 <- Cancelled by Thread1 |- Thread2 started processing
| |
Ctx2 |- Thread1 started processing
| T1 |- Thread2 finishes and syncs up local counters
Ctx3 <- Cancelled by Thread2 |
| |- Ctx5 is bound to Ctx2
Ctx4 |
T2 |- Thread1 reaches Ctx2
Thread-propagator of each cancellation increments global counter. However the thread
propagating the cancellation from the outermost context (Thread1) may be the last
to finish. Which means that the local counters may be synchronized earlier (by Thread2,
at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context
(Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only
(Ctx2) may result in cancellation request being lost.
This issue is solved by doing the whole propagation under the lock.
If we need more concurrency while processing parallel cancellations, we could try
the following modification of the propagation algorithm:
advance global counter and remember it
for each thread:
scan thread's list of contexts
for each thread:
sync up its local counter only if the global counter has not been changed
However this version of the algorithm requires more analysis and verification.
3. There is no portable way to get stack base address in Posix, however the modern
Linux versions provide pthread_attr_np API that can be used to obtain thread's
stack size and base address. Unfortunately even this function does not provide
enough information for the main thread on IA-64 architecture (RSE spill area
and memory stack are allocated as two separate discontinuous chunks of memory),
and there is no portable way to discern the main and the secondary threads.
Thus for OS X* and IA-64 Linux architecture we use the TBB worker stack size for
all threads and use the current stack top as the stack base. This simplified
approach is based on the following assumptions:
1) If the default stack size is insufficient for the user app needs, the
required amount will be explicitly specified by the user at the point of the
TBB scheduler initialization (as an argument to tbb::task_scheduler_init
constructor).
2) When a master thread initializes the scheduler, it has enough space on its
stack. Here "enough" means "at least as much as worker threads have".
3) If the user app strives to conserve the memory by cutting stack size, it
should do this for TBB workers too (as in the #1).
*/