You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/tbb/arena.cpp

868 lines
41 KiB
C++

/*
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
you can redistribute it and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation. Threading Building Blocks is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details. You should have received a copy of
the GNU General Public License along with Threading Building Blocks; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software library without
restriction. Specifically, if other files instantiate templates or use macros or inline
functions from this file, or you compile this file and link it with other files to produce
an executable, this file does not by itself cause the resulting executable to be covered
by the GNU General Public License. This exception does not however invalidate any other
reasons why the executable file might be covered by the GNU General Public License.
*/
#include "scheduler.h"
#include "governor.h"
#include "arena.h"
#include "itt_notify.h"
#include "semaphore.h"
#include <functional>
#if __TBB_STATISTICS_STDOUT
#include <cstdio>
#endif
namespace tbb {
namespace internal {
void arena::process( generic_scheduler& s ) {
__TBB_ASSERT( is_alive(my_guard), NULL );
__TBB_ASSERT( governor::is_set(&s), NULL );
__TBB_ASSERT( !s.my_innermost_running_task, NULL );
__TBB_ASSERT( !s.my_dispatching_task, NULL );
__TBB_ASSERT( my_num_slots != 1, NULL );
// Start search for an empty slot from the one we occupied the last time
unsigned index = s.my_arena_index < my_num_slots ? s.my_arena_index : s.my_random.get() % (my_num_slots - 1) + 1,
end = index;
__TBB_ASSERT( index != 0, "A worker cannot occupy slot 0" );
__TBB_ASSERT( index < my_num_slots, NULL );
// Find a vacant slot
for ( ;; ) {
if ( !my_slots[index].my_scheduler && as_atomic(my_slots[index].my_scheduler).compare_and_swap(&s, NULL ) == NULL )
break;
if ( ++index == my_num_slots )
index = 1;
if ( index == end ) {
// Likely this arena is already saturated
goto quit;
}
}
ITT_NOTIFY(sync_acquired, my_slots + index);
s.my_arena = this;
s.my_arena_index = index;
s.my_arena_slot = my_slots + index;
#if __TBB_TASK_PRIORITY
s.my_local_reload_epoch = *s.my_ref_reload_epoch;
__TBB_ASSERT( !s.my_offloaded_tasks, NULL );
#endif /* __TBB_TASK_PRIORITY */
s.attach_mailbox( affinity_id(index+1) );
s.my_arena_slot->hint_for_pop = index; // initial value for round-robin
#if !__TBB_FP_CONTEXT
my_cpu_ctl_env.set_env();
#endif
#if __TBB_SCHEDULER_OBSERVER
__TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" );
my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true );
#endif /* __TBB_SCHEDULER_OBSERVER */
atomic_update( my_limit, index + 1, std::less<unsigned>() );
for ( ;; ) {
// Try to steal a task.
// Passing reference count is technically unnecessary in this context,
// but omitting it here would add checks inside the function.
__TBB_ASSERT( is_alive(my_guard), NULL );
task* t = s.receive_or_steal_task( s.my_dummy_task->prefix().ref_count, /*return_if_no_work=*/true );
if (t) {
// A side effect of receive_or_steal_task is that my_innermost_running_task can be set.
// But for the outermost dispatch loop of a worker it has to be NULL.
s.my_innermost_running_task = NULL;
__TBB_ASSERT( !s.my_dispatching_task, NULL );
s.local_wait_for_all(*s.my_dummy_task,t);
}
__TBB_ASSERT ( __TBB_load_relaxed(s.my_arena_slot->head) == __TBB_load_relaxed(s.my_arena_slot->tail),
"Worker cannot leave arena while its task pool is not empty" );
__TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" );
// This check prevents relinquishing more than necessary workers because
// of the non-atomicity of the decision making procedure
if (num_workers_active() > my_num_workers_allotted)
break;
}
#if __TBB_SCHEDULER_OBSERVER
my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true );
s.my_last_local_observer = NULL;
#endif /* __TBB_SCHEDULER_OBSERVER */
#if __TBB_TASK_PRIORITY
if ( s.my_offloaded_tasks )
orphan_offloaded_tasks( s );
#endif /* __TBB_TASK_PRIORITY */
#if __TBB_STATISTICS
++s.my_counters.arena_roundtrips;
*my_slots[index].my_counters += s.my_counters;
s.my_counters.reset();
#endif /* __TBB_STATISTICS */
__TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL );
s.my_arena_slot = 0; // detached from slot
s.my_inbox.detach();
__TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
__TBB_ASSERT( !s.my_innermost_running_task, NULL );
__TBB_ASSERT( !s.my_dispatching_task, NULL );
__TBB_ASSERT( is_alive(my_guard), NULL );
quit:
// In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
// that arena may be temporarily left unpopulated by threads. See comments in
// arena::on_thread_leaving() for more details.
#if !__TBB_TRACK_PRIORITY_LEVEL_SATURATION
on_thread_leaving</*is_master*/false>();
#endif /* !__TBB_TRACK_PRIORITY_LEVEL_SATURATION */
}
arena::arena ( market& m, unsigned max_num_workers ) {
__TBB_ASSERT( !my_guard, "improperly allocated arena?" );
__TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" );
__TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" );
#if __TBB_TASK_PRIORITY
__TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" );
#endif /* __TBB_TASK_PRIORITY */
my_market = &m;
my_limit = 1;
// Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
my_num_slots = num_slots_to_reserve(max_num_workers);
my_max_num_workers = max_num_workers;
my_references = 1; // accounts for the master
#if __TBB_TASK_PRIORITY
my_bottom_priority = my_top_priority = normalized_normal_priority;
#endif /* __TBB_TASK_PRIORITY */
my_aba_epoch = m.my_arenas_aba_epoch;
#if __TBB_SCHEDULER_OBSERVER
my_observers.my_arena = this;
#endif /* __TBB_SCHEDULER_OBSERVER */
__TBB_ASSERT ( my_max_num_workers < my_num_slots, NULL );
// Construct slots. Mark internal synchronization elements for the tools.
for( unsigned i = 0; i < my_num_slots; ++i ) {
__TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
__TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
__TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool);
mailbox(i+1).construct();
ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox);
my_slots[i].hint_for_pop = i;
#if __TBB_STATISTICS
my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters;
#endif /* __TBB_STATISTICS */
}
#if __TBB_TASK_PRIORITY
for ( intptr_t i = 0; i < num_priority_levels; ++i ) {
my_task_stream[i].initialize(my_num_slots);
ITT_SYNC_CREATE(my_task_stream + i, SyncType_Scheduler, SyncObj_TaskStream);
}
#else /* !__TBB_TASK_PRIORITY */
my_task_stream.initialize(my_num_slots);
ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream);
#endif /* !__TBB_TASK_PRIORITY */
my_mandatory_concurrency = false;
#if __TBB_TASK_GROUP_CONTEXT
// Context to be used by root tasks by default (if the user has not specified one).
// The arena's context should not capture fp settings for the sake of backward compatibility.
my_default_ctx =
new ( NFS_Allocate(1, sizeof(task_group_context), NULL) ) task_group_context(task_group_context::isolated, task_group_context::default_traits);
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if __TBB_FP_CONTEXT
my_default_ctx->capture_fp_settings();
#else
my_cpu_ctl_env.get_env();
#endif
}
arena& arena::allocate_arena( market& m, unsigned max_num_workers ) {
__TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
__TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" );
__TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" );
size_t n = allocation_size(max_num_workers);
unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL );
// Zero all slots to indicate that they are empty
memset( storage, 0, n );
return *new( storage + num_slots_to_reserve(max_num_workers) * sizeof(mail_outbox) ) arena(m, max_num_workers);
}
void arena::free_arena () {
__TBB_ASSERT( is_alive(my_guard), NULL );
__TBB_ASSERT( !my_references, "There are threads in the dying arena" );
__TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
__TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" );
#if !__TBB_STATISTICS_EARLY_DUMP
GATHER_STATISTIC( dump_arena_statistics() );
#endif
poison_value( my_guard );
intptr_t drained = 0;
for ( unsigned i = 0; i < my_num_slots; ++i ) {
__TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
#if !__TBB_TASK_ARENA
__TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
#else
//TODO: understand the assertion and modify
#endif
__TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
my_slots[i].free_task_pool();
#if __TBB_STATISTICS
NFS_Free( my_slots[i].my_counters );
#endif /* __TBB_STATISTICS */
drained += mailbox(i+1).drain();
}
#if __TBB_TASK_PRIORITY && TBB_USE_ASSERT
for ( intptr_t i = 0; i < num_priority_levels; ++i )
__TBB_ASSERT(my_task_stream[i].empty() && my_task_stream[i].drain()==0, "Not all enqueued tasks were executed");
#elif !__TBB_TASK_PRIORITY
__TBB_ASSERT(my_task_stream.empty() && my_task_stream.drain()==0, "Not all enqueued tasks were executed");
#endif /* !__TBB_TASK_PRIORITY */
#if __TBB_COUNT_TASK_NODES
my_market->update_task_node_count( -drained );
#endif /* __TBB_COUNT_TASK_NODES */
my_market->release();
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" );
my_default_ctx->~task_group_context();
NFS_Free(my_default_ctx);
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if __TBB_SCHEDULER_OBSERVER
if ( !my_observers.empty() )
my_observers.clear();
#endif /* __TBB_SCHEDULER_OBSERVER */
void* storage = &mailbox(my_num_slots);
__TBB_ASSERT( my_references == 0, NULL );
__TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
this->~arena();
#if TBB_USE_ASSERT > 1
memset( storage, 0, allocation_size(my_max_num_workers) );
#endif /* TBB_USE_ASSERT */
NFS_Free( storage );
}
#if __TBB_STATISTICS
void arena::dump_arena_statistics () {
statistics_counters total;
for( unsigned i = 0; i < my_num_slots; ++i ) {
#if __TBB_STATISTICS_EARLY_DUMP
generic_scheduler* s = my_slots[i].my_scheduler;
if ( s )
*my_slots[i].my_counters += s->my_counters;
#else
__TBB_ASSERT( !my_slots[i].my_scheduler, NULL );
#endif
if ( i != 0 ) {
total += *my_slots[i].my_counters;
dump_statistics( *my_slots[i].my_counters, i );
}
}
dump_statistics( *my_slots[0].my_counters, 0 );
#if __TBB_STATISTICS_STDOUT
#if !__TBB_STATISTICS_TOTALS_ONLY
printf( "----------------------------------------------\n" );
#endif
dump_statistics( total, workers_counters_total );
total += *my_slots[0].my_counters;
dump_statistics( total, arena_counters_total );
#if !__TBB_STATISTICS_TOTALS_ONLY
printf( "==============================================\n" );
#endif
#endif /* __TBB_STATISTICS_STDOUT */
}
#endif /* __TBB_STATISTICS */
#if __TBB_TASK_PRIORITY
// The method inspects a scheduler to determine:
// 1. if it has tasks that can be retrieved and executed (via the return value);
// 2. if it has any tasks at all, including those of lower priority (via tasks_present);
// 3. if it is able to work with enqueued tasks (via dequeuing_possible).
inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) {
if ( !s
#if __TBB_TASK_ARENA
|| s->my_arena != this
#endif
) return false;
dequeuing_possible |= s->worker_outermost_level();
if ( s->my_pool_reshuffling_pending ) {
// This primary task pool is nonempty and may contain tasks at the current
// priority level. Its owner is winnowing lower priority tasks at the moment.
tasks_present = true;
return true;
}
if ( s->my_offloaded_tasks ) {
tasks_present = true;
if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) {
// This scheduler's offload area is nonempty and may contain tasks at the
// current priority level.
return true;
}
}
return false;
}
void arena::orphan_offloaded_tasks(generic_scheduler& s) {
__TBB_ASSERT( s.my_offloaded_tasks, NULL );
GATHER_STATISTIC( ++s.my_counters.prio_orphanings );
++my_abandonment_epoch;
__TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL );
task* orphans;
do {
orphans = const_cast<task*>(my_orphaned_tasks);
*s.my_offloaded_task_list_tail_link = orphans;
} while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans );
s.my_offloaded_tasks = NULL;
#if TBB_USE_ASSERT
s.my_offloaded_task_list_tail_link = NULL;
#endif /* TBB_USE_ASSERT */
}
#endif /* __TBB_TASK_PRIORITY */
bool arena::is_out_of_work() {
// TODO: rework it to return at least a hint about where a task was found; better if the task itself.
for(;;) {
pool_state_t snapshot = my_pool_state;
switch( snapshot ) {
case SNAPSHOT_EMPTY:
return true;
case SNAPSHOT_FULL: {
// Use unique id for "busy" in order to avoid ABA problems.
const pool_state_t busy = pool_state_t(&busy);
// Request permission to take snapshot
if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) {
// Got permission. Take the snapshot.
// NOTE: This is not a lock, as the state can be set to FULL at
// any moment by a thread that spawns/enqueues new task.
size_t n = my_limit;
// Make local copies of volatile parameters. Their change during
// snapshot taking procedure invalidates the attempt, and returns
// this thread into the dispatch loop.
#if __TBB_TASK_PRIORITY
intptr_t top_priority = my_top_priority;
uintptr_t reload_epoch = my_reload_epoch;
// Inspect primary task pools first
#endif /* __TBB_TASK_PRIORITY */
size_t k;
for( k=0; k<n; ++k ) {
if( my_slots[k].task_pool != EmptyTaskPool &&
__TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) )
{
// k-th primary task pool is nonempty and does contain tasks.
break;
}
if( my_pool_state!=busy )
return false; // the work was published
}
__TBB_ASSERT( k <= n, NULL );
bool work_absent = k == n;
#if __TBB_TASK_PRIORITY
// Variable tasks_present indicates presence of tasks at any priority
// level, while work_absent refers only to the current priority.
bool tasks_present = !work_absent || my_orphaned_tasks;
bool dequeuing_possible = false;
if ( work_absent ) {
// Check for the possibility that recent priority changes
// brought some tasks to the current priority level
uintptr_t abandonment_epoch = my_abandonment_epoch;
// Master thread's scheduler needs special handling as it
// may be destroyed at any moment (workers' schedulers are
// guaranteed to be alive while at least one thread is in arena).
// Have to exclude concurrency with task group state change propagation too.
// TODO: check whether it is still necessary since some pools belong to slots now
my_market->my_arenas_list_mutex.lock();
generic_scheduler *s = my_slots[0].my_scheduler;
if ( s && as_atomic(my_slots[0].my_scheduler).compare_and_swap(LockedMaster, s) == s ) { //TODO: remove need to lock
__TBB_ASSERT( my_slots[0].my_scheduler == LockedMaster && s != LockedMaster, NULL );
work_absent = !may_have_tasks( s, tasks_present, dequeuing_possible );
__TBB_store_with_release( my_slots[0].my_scheduler, s );
}
my_market->my_arenas_list_mutex.unlock();
// The following loop is subject to data races. While k-th slot's
// scheduler is being examined, corresponding worker can either
// leave to RML or migrate to another arena.
// But the races are not prevented because all of them are benign.
// First, the code relies on the fact that worker thread's scheduler
// object persists until the whole library is deinitialized.
// Second, in the worst case the races can only cause another
// round of stealing attempts to be undertaken. Introducing complex
// synchronization into this coldest part of the scheduler's control
// flow does not seem to make sense because it both is unlikely to
// ever have any observable performance effect, and will require
// additional synchronization code on the hotter paths.
for( k = 1; work_absent && k < n; ++k ) {
if( my_pool_state!=busy )
return false; // the work was published
work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
}
// Preclude premature switching arena off because of a race in the previous loop.
work_absent = work_absent
&& !__TBB_load_with_acquire(my_orphaned_tasks)
&& abandonment_epoch == my_abandonment_epoch;
}
#endif /* __TBB_TASK_PRIORITY */
// Test and test-and-set.
if( my_pool_state==busy ) {
#if __TBB_TASK_PRIORITY
bool no_fifo_tasks = my_task_stream[top_priority].empty();
work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
&& top_priority == my_top_priority && reload_epoch == my_reload_epoch;
#else
bool no_fifo_tasks = my_task_stream.empty();
work_absent = work_absent && no_fifo_tasks;
#endif /* __TBB_TASK_PRIORITY */
if( work_absent ) {
#if __TBB_TASK_PRIORITY
if ( top_priority > my_bottom_priority ) {
if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch)
&& !my_task_stream[top_priority].empty() )
{
atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
}
}
else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
#endif /* __TBB_TASK_PRIORITY */
// save current demand value before setting SNAPSHOT_EMPTY,
// to avoid race with advertise_new_work.
int current_demand = (int)my_max_num_workers;
if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) {
// This thread transitioned pool to empty state, and thus is
// responsible for telling RML that there is no other work to do.
my_market->adjust_demand( *this, -current_demand );
#if __TBB_TASK_PRIORITY
// Check for the presence of enqueued tasks "lost" on some of
// priority levels because updating arena priority and switching
// arena into "populated" (FULL) state happen non-atomically.
// Imposing atomicity would require task::enqueue() to use a lock,
// which is unacceptable.
bool switch_back = false;
for ( int p = 0; p < num_priority_levels; ++p ) {
if ( !my_task_stream[p].empty() ) {
switch_back = true;
if ( p < my_bottom_priority || p > my_top_priority )
my_market->update_arena_priority(*this, p);
}
}
if ( switch_back )
advertise_new_work</*Spawned*/false>();
#endif /* __TBB_TASK_PRIORITY */
return true;
}
return false;
#if __TBB_TASK_PRIORITY
}
#endif /* __TBB_TASK_PRIORITY */
}
// Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy );
}
}
return false;
}
default:
// Another thread is taking a snapshot.
return false;
}
}
}
#if __TBB_COUNT_TASK_NODES
intptr_t arena::workers_task_node_count() {
intptr_t result = 0;
for( unsigned i = 1; i < my_num_slots; ++i ) {
generic_scheduler* s = my_slots[i].my_scheduler;
if( s )
result += s->my_task_node_count;
}
return result;
}
#endif /* __TBB_COUNT_TASK_NODES */
void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random )
{
#if __TBB_RECYCLE_TO_ENQUEUE
__TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" );
#else
__TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
#endif
t.prefix().state = task::ready;
t.prefix().extra_state |= es_task_enqueued; // enqueued task marker
#if TBB_USE_ASSERT
if( task* parent = t.parent() ) {
internal::reference_count ref_count = parent->prefix().ref_count;
__TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
__TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" );
parent->prefix().extra_state |= es_ref_count_active;
}
__TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
#endif /* TBB_USE_ASSERT */
#if __TBB_TASK_PRIORITY
intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority;
assert_priority_valid(p);
task_stream &ts = my_task_stream[p];
#else /* !__TBB_TASK_PRIORITY */
__TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority");
task_stream &ts = my_task_stream;
#endif /* !__TBB_TASK_PRIORITY */
ITT_NOTIFY(sync_releasing, &ts);
ts.push( &t, random );
#if __TBB_TASK_PRIORITY
if ( p != my_top_priority )
my_market->update_arena_priority( *this, p );
#endif /* __TBB_TASK_PRIORITY */
advertise_new_work< /*Spawned=*/ false >();
#if __TBB_TASK_PRIORITY
if ( p != my_top_priority )
my_market->update_arena_priority( *this, p );
#endif /* __TBB_TASK_PRIORITY */
}
#if __TBB_TASK_ARENA
struct nested_arena_context : no_copy {
generic_scheduler &my_scheduler;
scheduler_state const my_orig_state;
void *my_orig_ptr;
bool my_adjusting;
nested_arena_context(generic_scheduler *s, arena* a, bool needs_adjusting, bool as_worker = false)
: my_scheduler(*s), my_orig_state(*s), my_orig_ptr(NULL), my_adjusting(needs_adjusting) {
s->nested_arena_entry(a, *this, as_worker);
}
~nested_arena_context() {
my_scheduler.nested_arena_exit(*this);
(scheduler_state&)my_scheduler = my_orig_state; // restore arena settings
}
};
void generic_scheduler::nested_arena_entry(arena* a, nested_arena_context& c, bool as_worker) {
if( a == my_arena ) {
#if __TBB_TASK_GROUP_CONTEXT
c.my_orig_ptr = my_innermost_running_task =
new(&allocate_task(sizeof(empty_task), NULL, a->my_default_ctx)) empty_task;
#endif
return;
}
__TBB_ASSERT( is_alive(a->my_guard), NULL );
// overwrite arena settings
#if __TBB_TASK_PRIORITY
if ( my_offloaded_tasks )
my_arena->orphan_offloaded_tasks( *this );
my_ref_top_priority = &a->my_top_priority;
my_ref_reload_epoch = &a->my_reload_epoch;
my_local_reload_epoch = a->my_reload_epoch;
#endif /* __TBB_TASK_PRIORITY */
my_arena = a;
my_arena_index = 0;
my_arena_slot = my_arena->my_slots + my_arena_index;
my_inbox.detach(); // TODO: mailboxes were not designed for switching, add copy constructor?
attach_mailbox( affinity_id(my_arena_index+1) );
my_innermost_running_task = my_dispatching_task = as_worker? NULL : my_dummy_task;
#if __TBB_TASK_GROUP_CONTEXT
// save dummy's context and replace it by arena's context
c.my_orig_ptr = my_dummy_task->prefix().context;
my_dummy_task->prefix().context = a->my_default_ctx;
#endif
#if __TBB_ARENA_OBSERVER
my_last_local_observer = 0; // TODO: try optimize number of calls
my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false );
#endif
// TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index);
// TODO: it requires market to have P workers (not P-1)
// TODO: it still allows temporary oversubscription by 1 worker (due to my_max_num_workers)
// TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack--
if( c.my_adjusting ) my_arena->my_market->adjust_demand(*my_arena, -1);
}
void generic_scheduler::nested_arena_exit(nested_arena_context& c) {
if( my_arena == c.my_orig_state.my_arena ) {
#if __TBB_TASK_GROUP_CONTEXT
free_task<small_local_task>(*(task*)c.my_orig_ptr); // TODO: use scoped_task instead?
#endif
return;
}
if( c.my_adjusting ) my_arena->my_market->adjust_demand(*my_arena, 1);
#if __TBB_ARENA_OBSERVER
my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false );
#endif /* __TBB_SCHEDULER_OBSERVER */
#if __TBB_TASK_PRIORITY
if ( my_offloaded_tasks )
my_arena->orphan_offloaded_tasks( *this );
my_local_reload_epoch = *c.my_orig_state.my_ref_reload_epoch;
while ( as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap( NULL, this) != this )
__TBB_Yield(); // TODO: task priority can use master slot for locking while accessing the scheduler
#else
// Free the master slot. TODO: support multiple masters
__TBB_store_with_release(my_arena->my_slots[0].my_scheduler, (generic_scheduler*)NULL);
#endif
my_arena->my_exit_monitors.notify_all_relaxed(); // TODO: fix concurrent monitor to use notify_one (test MultipleMastersPart4 fails)
#if __TBB_TASK_GROUP_CONTEXT
// restore context of dummy task
my_dummy_task->prefix().context = (task_group_context*)c.my_orig_ptr;
#endif
}
void generic_scheduler::wait_until_empty() {
my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing
while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY )
local_wait_for_all(*my_dummy_task, NULL);
my_dummy_task->prefix().ref_count--;
}
#endif /* __TBB_TASK_ARENA */
} // namespace internal
} // namespace tbb
#if __TBB_TASK_ARENA
#include "scheduler_utility.h"
namespace tbb {
namespace interface7 {
namespace internal {
void task_arena_base::internal_initialize( ) {
__TBB_ASSERT( my_master_slots <= 1, "Number of slots reserved for master can be only [0,1]");
if( my_master_slots > 1 ) my_master_slots = 1; // TODO: make more masters
if( my_max_concurrency < 1 )
my_max_concurrency = (int)governor::default_num_threads();
// TODO: reimplement in an efficient way. We need a scheduler instance in this thread
// but the scheduler is only required for task allocation and fifo random seeds until
// master wants to join the arena. (Idea - to create a restricted specialization)
// It is excessive to create an implicit arena for master here anyway. But scheduler
// instance implies master thread to be always connected with arena.
// browse recursively into init_scheduler and arena::process for details
if( !governor::local_scheduler_if_initialized() )
governor::init_scheduler( (unsigned)my_max_concurrency - my_master_slots + 1/*TODO: address in market instead*/, 0, true );
// TODO: we will need to introduce a mechanism for global settings, including stack size, used by all arenas
arena* new_arena = &market::create_arena( my_max_concurrency - my_master_slots/*it's +1 slot for num_masters=0*/, ThreadStackSize );
if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) { // there is a race possible on my_initialized
__TBB_ASSERT(my_arena, NULL); // other thread was the first
new_arena->on_thread_leaving</*is_master*/true>(); // deallocate new arena
}
#if __TBB_TASK_GROUP_CONTEXT
else {
my_context = new_arena->my_default_ctx;
my_context->my_version_and_traits |= my_version_and_traits & exact_exception_flag;
}
#endif
}
void task_arena_base::internal_terminate( ) {
if( my_arena ) {// task_arena was initialized
#if __TBB_STATISTICS_EARLY_DUMP
GATHER_STATISTIC( my_arena->dump_arena_statistics() );
#endif
my_arena->on_thread_leaving</*is_master*/true>();
my_arena = 0;
#if __TBB_TASK_GROUP_CONTEXT
my_context = 0;
#endif
}
}
void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const {
__TBB_ASSERT(my_arena, NULL);
generic_scheduler* s = governor::local_scheduler_if_initialized();
__TBB_ASSERT(s, "Scheduler is not initialized"); // we allocated a task so can expect the scheduler
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT(my_arena->my_default_ctx == t.prefix().context, NULL);
__TBB_ASSERT(!my_arena->my_default_ctx->is_group_execution_cancelled(), // TODO: any better idea?
"The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
#endif
my_arena->enqueue_task( t, prio, s->my_random );
}
class delegated_task : public task {
internal::delegate_base & my_delegate;
concurrent_monitor & my_monitor;
task * my_root;
/*override*/ task* execute() {
generic_scheduler& s = *(generic_scheduler*)prefix().owner;
__TBB_ASSERT(s.worker_outermost_level() || s.master_outermost_level(), "expected to be enqueued and received on the outermost level");
// but this task can mimics outermost level, detect it
if( s.master_outermost_level() && s.my_dummy_task->state() == task::executing ) {
#if TBB_USE_EXCEPTIONS
// RTTI is available, check whether the cast is valid
__TBB_ASSERT(dynamic_cast<delegated_task*>(s.my_dummy_task), 0);
#endif
set_ref_count(1); // required by the semantics of recycle_to_enqueue()
recycle_to_enqueue();
return NULL;
}
struct outermost_context : internal::no_copy {
delegated_task * t;
generic_scheduler & s;
task * orig_dummy;
task_group_context * orig_ctx;
outermost_context(delegated_task *_t, generic_scheduler &_s) : t(_t), s(_s) {
orig_dummy = s.my_dummy_task;
#if __TBB_TASK_GROUP_CONTEXT
orig_ctx = t->prefix().context;
t->prefix().context = s.my_arena->my_default_ctx;
#endif
s.my_dummy_task = t; // mimics outermost master
__TBB_ASSERT(s.my_innermost_running_task == t, NULL);
}
~outermost_context() {
s.my_dummy_task = orig_dummy;
#if TBB_USE_EXCEPTIONS
// restore context for sake of registering potential exception
t->prefix().context = orig_ctx;
#endif
}
} scope(this, s);
my_delegate();
return NULL;
}
~delegated_task() {
// potential exception was already registered. It must happen before the notification
__TBB_ASSERT(my_root->ref_count()==2, NULL);
__TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup
my_monitor.notify_relaxed(*this);
}
public:
delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t )
: my_delegate(d), my_monitor(s), my_root(t) {}
// predicate for concurrent_monitor notification
bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; }
};
void task_arena_base::internal_execute( internal::delegate_base& d) const {
__TBB_ASSERT(my_arena, NULL);
generic_scheduler* s = governor::local_scheduler();
__TBB_ASSERT(s, "Scheduler is not initialized");
// TODO: is it safe to assign slot to a scheduler which is not yet switched?
// TODO TEMP: one master, make more masters
if( s->my_arena == my_arena || (!__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler)
&& as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL ) == NULL) ) {
cpu_ctl_env_helper cpu_ctl_helper;
cpu_ctl_helper.set_env( __TBB_CONTEXT_ARG1(my_context) );
#if TBB_USE_EXCEPTIONS
try {
#endif
//TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context
nested_arena_context scope(s, my_arena, !my_master_slots);
d();
#if TBB_USE_EXCEPTIONS
} catch(...) {
cpu_ctl_helper.restore_default(); // TODO: is it needed on Windows?
if( my_version_and_traits & exact_exception_flag ) throw;
else {
task_group_context exception_container( task_group_context::isolated,
task_group_context::default_traits & ~task_group_context::exact_exception );
exception_container.register_pending_exception();
__TBB_ASSERT(exception_container.my_exception, NULL);
exception_container.my_exception->throw_self();
}
}
#endif
} else {
concurrent_monitor::thread_context waiter;
#if __TBB_TASK_GROUP_CONTEXT
task_group_context exec_context( task_group_context::isolated, my_version_and_traits & exact_exception_flag );
#if __TBB_FP_CONTEXT
exec_context.copy_fp_settings( *my_context );
#endif
#endif
auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context));
root.prefix().ref_count = 2;
my_arena->enqueue_task( *new( task::allocate_root(__TBB_CONTEXT_ARG1(exec_context)) )
delegated_task(d, my_arena->my_exit_monitors, &root),
0, s->my_random ); // TODO: priority?
do {
my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d);
if( __TBB_load_with_acquire(root.prefix().ref_count) < 2 ) {
my_arena->my_exit_monitors.cancel_wait(waiter);
break;
}
else if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO: refactor into a function?
&& as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL ) == NULL ) {
my_arena->my_exit_monitors.cancel_wait(waiter);
nested_arena_context scope(s, my_arena, !my_master_slots);
s->local_wait_for_all(root, NULL);
#if TBB_USE_EXCEPTIONS
__TBB_ASSERT( !exec_context.my_exception, NULL ); // exception can be thrown above, not deferred
#endif
__TBB_ASSERT( root.prefix().ref_count == 0, NULL );
break;
} else {
my_arena->my_exit_monitors.commit_wait(waiter);
}
} while( __TBB_load_with_acquire(root.prefix().ref_count) == 2 );
#if TBB_USE_EXCEPTIONS
// process possible exception
if( task_group_context::exception_container_type *pe = exec_context.my_exception )
pe->throw_self();
#endif
}
}
// this wait task is a temporary approach to wait for arena emptiness for masters without slots
// TODO: it will be rather reworked for one source of notification from is_out_of_work
class wait_task : public task {
binary_semaphore & my_signal;
/*override*/ task* execute() {
generic_scheduler* s = governor::local_scheduler_if_initialized();
__TBB_ASSERT( s, NULL );
if( s->my_arena_index && s->worker_outermost_level() ) {// on outermost level of workers only
s->local_wait_for_all( *s->my_dummy_task, NULL ); // run remaining tasks
} else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full
my_signal.V();
return NULL;
}
public:
wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
};
void task_arena_base::internal_wait() const {
__TBB_ASSERT(my_arena, NULL);
generic_scheduler* s = governor::local_scheduler();
__TBB_ASSERT(s, "Scheduler is not initialized");
__TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" );
if( s->my_arena == my_arena ) {
//unsupported, but try do something for outermost master
__TBB_ASSERT(s->master_outermost_level(), "unsupported");
if( !s->my_arena_index )
while( my_arena->num_workers_active() )
s->wait_until_empty();
} else for(;;) {
while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) {
if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters
&& as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) {
nested_arena_context a(s, my_arena, !my_master_slots, true);
s->wait_until_empty();
} else {
binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work
internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority?
waiter.P(); // TODO: concurrent_monitor
}
}
if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity
break; // spin until workers active but avoid spinning in a worker
__TBB_Yield(); // wait until workers and master leave
}
}
/*static*/ int task_arena_base::internal_current_slot() {
generic_scheduler* s = governor::local_scheduler_if_initialized();
return s? int(s->my_arena_index) : -1;
}
} // tbb::interfaceX::internal
} // tbb::interfaceX
} // tbb
#endif /* __TBB_TASK_ARENA */