You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
421 lines
17 KiB
C++
421 lines
17 KiB
C++
/*
|
|
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
|
|
|
|
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
|
|
you can redistribute it and/or modify it under the terms of the GNU General Public License
|
|
version 2 as published by the Free Software Foundation. Threading Building Blocks is
|
|
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
|
|
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
See the GNU General Public License for more details. You should have received a copy of
|
|
the GNU General Public License along with Threading Building Blocks; if not, write to the
|
|
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
As a special exception, you may use this file as part of a free software library without
|
|
restriction. Specifically, if other files instantiate templates or use macros or inline
|
|
functions from this file, or you compile this file and link it with other files to produce
|
|
an executable, this file does not by itself cause the resulting executable to be covered
|
|
by the GNU General Public License. This exception does not however invalidate any other
|
|
reasons why the executable file might be covered by the GNU General Public License.
|
|
*/
|
|
|
|
#include "tbb/tbb_config.h"
|
|
#if !__TBB_ARENA_OBSERVER
|
|
#error __TBB_ARENA_OBSERVER must be defined
|
|
#endif
|
|
|
|
#if __TBB_SCHEDULER_OBSERVER
|
|
|
|
#include "observer_proxy.h"
|
|
#include "tbb_main.h"
|
|
#include "governor.h"
|
|
#include "scheduler.h"
|
|
#include "arena.h"
|
|
|
|
namespace tbb {
|
|
namespace internal {
|
|
|
|
padded<observer_list> the_global_observer_list;
|
|
|
|
#if TBB_USE_ASSERT
|
|
static atomic<int> observer_proxy_count;
|
|
|
|
struct check_observer_proxy_count {
|
|
~check_observer_proxy_count() {
|
|
if( observer_proxy_count!=0 ) {
|
|
runtime_warning( "Leaked %ld observer_proxy objects\n", long(observer_proxy_count) );
|
|
}
|
|
}
|
|
};
|
|
|
|
static check_observer_proxy_count the_check_observer_proxy_count;
|
|
#endif /* TBB_USE_ASSERT */
|
|
|
|
interface6::task_scheduler_observer* observer_proxy::get_v6_observer() {
|
|
if(my_version != 6) return NULL;
|
|
return static_cast<interface6::task_scheduler_observer*>(my_observer);
|
|
}
|
|
|
|
bool observer_proxy::is_global() {
|
|
return !get_v6_observer() || get_v6_observer()->my_context_tag == interface6::task_scheduler_observer::global_tag;
|
|
}
|
|
|
|
observer_proxy::observer_proxy( task_scheduler_observer_v3& tso )
|
|
: my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
|
|
{
|
|
#if TBB_USE_ASSERT
|
|
++observer_proxy_count;
|
|
#endif /* TBB_USE_ASSERT */
|
|
// 1 for observer
|
|
my_ref_count = 1;
|
|
my_version = load<relaxed>(my_observer->my_busy_count)
|
|
== interface6::task_scheduler_observer::v6_trait ? 6 : 0;
|
|
__TBB_ASSERT( my_version >= 6 || !load<relaxed>(my_observer->my_busy_count), NULL );
|
|
}
|
|
|
|
#if TBB_USE_ASSERT
|
|
observer_proxy::~observer_proxy () {
|
|
__TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
|
|
poison_value(my_ref_count);
|
|
poison_pointer(my_prev);
|
|
poison_pointer(my_next);
|
|
--observer_proxy_count;
|
|
}
|
|
#endif /* TBB_USE_ASSERT */
|
|
|
|
template<memory_semantics M, class T, class V>
|
|
T atomic_fetch_and_store ( T* addr, const V& val ) {
|
|
return (T)atomic_traits<sizeof(T), M>::fetch_and_store( addr, (T)val );
|
|
}
|
|
|
|
void observer_list::clear () {
|
|
__TBB_ASSERT( this != &the_global_observer_list, "Method clear() cannot be used on the list of global observers" );
|
|
// Though the method will work fine for the empty list, we require the caller
|
|
// to check for the list emptiness before invoking it to avoid extra overhead.
|
|
__TBB_ASSERT( !empty(), NULL );
|
|
{
|
|
scoped_lock lock(mutex(), /*is_writer=*/true);
|
|
observer_proxy *next = my_head;
|
|
while ( observer_proxy *p = next ) {
|
|
__TBB_ASSERT( p->my_version >= 6, NULL );
|
|
next = p->my_next;
|
|
// Both proxy p and observer p->my_observer (if non-null) are guaranteed
|
|
// to be alive while the list is locked.
|
|
task_scheduler_observer_v3 *obs = p->my_observer;
|
|
// Make sure that possible concurrent observer destruction does not
|
|
// conflict with the proxy list cleanup.
|
|
if ( !obs || !(p = (observer_proxy*)__TBB_FetchAndStoreW(&obs->my_proxy, 0)) )
|
|
continue;
|
|
// accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
|
|
__TBB_ASSERT( !next || p == next->my_prev, NULL );
|
|
__TBB_ASSERT( is_alive(p->my_ref_count), "Observer's proxy died prematurely" );
|
|
__TBB_ASSERT( p->my_ref_count == 1, "Reference for observer is missing" );
|
|
#if TBB_USE_ASSERT
|
|
p->my_observer = NULL;
|
|
p->my_ref_count = 0;
|
|
#endif /* TBB_USE_ASSERT */
|
|
remove(p);
|
|
delete p;
|
|
}
|
|
}
|
|
while( my_head )
|
|
__TBB_Yield();
|
|
}
|
|
|
|
void observer_list::insert ( observer_proxy* p ) {
|
|
scoped_lock lock(mutex(), /*is_writer=*/true);
|
|
if ( my_head ) {
|
|
p->my_prev = my_tail;
|
|
my_tail->my_next = p;
|
|
}
|
|
else
|
|
my_head = p;
|
|
my_tail = p;
|
|
}
|
|
|
|
void observer_list::remove ( observer_proxy* p ) {
|
|
__TBB_ASSERT( my_head, "Attempt to remove an item from an empty list" );
|
|
__TBB_ASSERT( !my_tail->my_next, "Last item's my_next must be NULL" );
|
|
if( p == my_tail ) {
|
|
__TBB_ASSERT( !p->my_next, NULL );
|
|
my_tail = p->my_prev;
|
|
}
|
|
else {
|
|
__TBB_ASSERT( p->my_next, NULL );
|
|
p->my_next->my_prev = p->my_prev;
|
|
}
|
|
if ( p == my_head ) {
|
|
__TBB_ASSERT( !p->my_prev, NULL );
|
|
my_head = p->my_next;
|
|
}
|
|
else {
|
|
__TBB_ASSERT( p->my_prev, NULL );
|
|
p->my_prev->my_next = p->my_next;
|
|
}
|
|
__TBB_ASSERT( (my_head && my_tail) || (!my_head && !my_tail), NULL );
|
|
}
|
|
|
|
void observer_list::remove_ref( observer_proxy* p ) {
|
|
int r = p->my_ref_count;
|
|
__TBB_ASSERT( is_alive(r), NULL );
|
|
while(r>1) {
|
|
__TBB_ASSERT( r!=0, NULL );
|
|
int r_old = p->my_ref_count.compare_and_swap(r-1,r);
|
|
if( r_old==r ) {
|
|
// Successfully decremented count.
|
|
return;
|
|
}
|
|
r = r_old;
|
|
}
|
|
__TBB_ASSERT( r==1, NULL );
|
|
// Reference count might go to zero
|
|
{
|
|
// Use lock to avoid resurrection by a thread concurrently walking the list
|
|
observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
|
|
r = --p->my_ref_count;
|
|
if( !r )
|
|
remove(p);
|
|
}
|
|
__TBB_ASSERT( r || !p->my_ref_count, NULL );
|
|
if( !r )
|
|
delete p;
|
|
}
|
|
|
|
void observer_list::do_notify_entry_observers( observer_proxy*& last, bool worker ) {
|
|
// Pointer p marches though the list from last (exclusively) to the end.
|
|
observer_proxy *p = last, *prev = p;
|
|
for(;;) {
|
|
task_scheduler_observer_v3* tso=NULL;
|
|
// Hold lock on list only long enough to advance to the next proxy in the list.
|
|
{
|
|
scoped_lock lock(mutex(), /*is_writer=*/false);
|
|
do {
|
|
if( p ) {
|
|
// We were already processing the list.
|
|
if( observer_proxy* q = p->my_next ) {
|
|
if( p == prev )
|
|
remove_ref_fast(prev); // sets prev to NULL if successful
|
|
p = q;
|
|
}
|
|
else {
|
|
// Reached the end of the list.
|
|
if( p == prev ) {
|
|
// Keep the reference as we store the 'last' pointer in scheduler
|
|
__TBB_ASSERT(p->my_ref_count >= 1 + (p->my_observer?1:0), NULL);
|
|
} else {
|
|
// The last few proxies were empty
|
|
__TBB_ASSERT(p->my_ref_count, NULL);
|
|
++p->my_ref_count;
|
|
if( prev ) {
|
|
lock.release();
|
|
remove_ref(prev);
|
|
}
|
|
}
|
|
last = p;
|
|
return;
|
|
}
|
|
} else {
|
|
// Starting pass through the list
|
|
p = my_head;
|
|
if( !p )
|
|
return;
|
|
}
|
|
tso = p->my_observer;
|
|
} while( !tso );
|
|
++p->my_ref_count;
|
|
++tso->my_busy_count;
|
|
}
|
|
__TBB_ASSERT( !prev || p!=prev, NULL );
|
|
// Release the proxy pinned before p
|
|
if( prev )
|
|
remove_ref(prev);
|
|
// Do not hold any locks on the list while calling user's code.
|
|
// Do not intercept any exceptions that may escape the callback so that
|
|
// they are either handled by the TBB scheduler or passed to the debugger.
|
|
tso->on_scheduler_entry(worker);
|
|
__TBB_ASSERT(p->my_ref_count, NULL);
|
|
intptr_t bc = --tso->my_busy_count;
|
|
__TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
|
|
prev = p;
|
|
}
|
|
}
|
|
|
|
void observer_list::do_notify_exit_observers( observer_proxy* last, bool worker ) {
|
|
// Pointer p marches though the list from the beginning to last (inclusively).
|
|
observer_proxy *p = NULL, *prev = NULL;
|
|
for(;;) {
|
|
task_scheduler_observer_v3* tso=NULL;
|
|
// Hold lock on list only long enough to advance to the next proxy in the list.
|
|
{
|
|
scoped_lock lock(mutex(), /*is_writer=*/false);
|
|
do {
|
|
if( p ) {
|
|
// We were already processing the list.
|
|
if( p != last ) {
|
|
__TBB_ASSERT( p->my_next, "List items before 'last' must have valid my_next pointer" );
|
|
if( p == prev )
|
|
remove_ref_fast(prev); // sets prev to NULL if successful
|
|
p = p->my_next;
|
|
} else {
|
|
// remove the reference from the last item
|
|
remove_ref_fast(p);
|
|
if( p ) {
|
|
lock.release();
|
|
remove_ref(p);
|
|
}
|
|
return;
|
|
}
|
|
} else {
|
|
// Starting pass through the list
|
|
p = my_head;
|
|
__TBB_ASSERT( p, "Nonzero 'last' must guarantee that the global list is non-empty" );
|
|
}
|
|
tso = p->my_observer;
|
|
} while( !tso );
|
|
// The item is already refcounted
|
|
if ( p != last ) // the last is already referenced since entry notification
|
|
++p->my_ref_count;
|
|
++tso->my_busy_count;
|
|
}
|
|
__TBB_ASSERT( !prev || p!=prev, NULL );
|
|
if( prev )
|
|
remove_ref(prev);
|
|
// Do not hold any locks on the list while calling user's code.
|
|
// Do not intercept any exceptions that may escape the callback so that
|
|
// they are either handled by the TBB scheduler or passed to the debugger.
|
|
tso->on_scheduler_exit(worker);
|
|
__TBB_ASSERT(p->my_ref_count || p == last, NULL);
|
|
intptr_t bc = --tso->my_busy_count;
|
|
__TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
|
|
prev = p;
|
|
}
|
|
}
|
|
|
|
#if __TBB_SLEEP_PERMISSION
|
|
bool observer_list::ask_permission_to_leave() {
|
|
__TBB_ASSERT( this == &the_global_observer_list, "This method cannot be used on lists of arena observers" );
|
|
if( !my_head ) return true;
|
|
// Pointer p marches though the list
|
|
observer_proxy *p = NULL, *prev = NULL;
|
|
bool result = true;
|
|
while( result ) {
|
|
task_scheduler_observer* tso = NULL;
|
|
// Hold lock on list only long enough to advance to the next proxy in the list.
|
|
{
|
|
scoped_lock lock(mutex(), /*is_writer=*/false);
|
|
do {
|
|
if( p ) {
|
|
// We were already processing the list.
|
|
observer_proxy* q = p->my_next;
|
|
// read next, remove the previous reference
|
|
if( p == prev )
|
|
remove_ref_fast(prev); // sets prev to NULL if successful
|
|
if( q ) p = q;
|
|
else {
|
|
// Reached the end of the list.
|
|
if( prev ) {
|
|
lock.release();
|
|
remove_ref(prev);
|
|
}
|
|
return result;
|
|
}
|
|
} else {
|
|
// Starting pass through the list
|
|
p = my_head;
|
|
if( !p )
|
|
return result;
|
|
}
|
|
tso = p->get_v6_observer();
|
|
} while( !tso );
|
|
++p->my_ref_count;
|
|
++tso->my_busy_count;
|
|
}
|
|
__TBB_ASSERT( !prev || p!=prev, NULL );
|
|
// Release the proxy pinned before p
|
|
if( prev )
|
|
remove_ref(prev);
|
|
// Do not hold any locks on the list while calling user's code.
|
|
// Do not intercept any exceptions that may escape the callback so that
|
|
// they are either handled by the TBB scheduler or passed to the debugger.
|
|
result = tso->may_sleep();
|
|
__TBB_ASSERT(p->my_ref_count, NULL);
|
|
intptr_t bc = --tso->my_busy_count;
|
|
__TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
|
|
prev = p;
|
|
}
|
|
if( prev )
|
|
remove_ref(prev);
|
|
return result;
|
|
}
|
|
#endif//__TBB_SLEEP_PERMISSION
|
|
|
|
void task_scheduler_observer_v3::observe( bool enable ) {
|
|
if( enable ) {
|
|
if( !my_proxy ) {
|
|
my_proxy = new observer_proxy( *this );
|
|
my_busy_count = 0; // proxy stores versioning information, clear it
|
|
if ( !my_proxy->is_global() ) {
|
|
// Local observer activation
|
|
generic_scheduler* s = governor::local_scheduler_if_initialized();
|
|
#if __TBB_TASK_ARENA
|
|
__TBB_ASSERT( my_proxy->get_v6_observer(), NULL );
|
|
intptr_t tag = my_proxy->get_v6_observer()->my_context_tag;
|
|
if( tag != interface6::task_scheduler_observer::implicit_tag ) { // explicit arena
|
|
task_arena *a = reinterpret_cast<task_arena*>(tag);
|
|
a->initialize();
|
|
my_proxy->my_list = &a->my_arena->my_observers;
|
|
} else
|
|
#endif
|
|
{
|
|
if( !s ) s = governor::init_scheduler( (unsigned)task_scheduler_init::automatic, 0, true );
|
|
__TBB_ASSERT( __TBB_InitOnce::initialization_done(), NULL );
|
|
__TBB_ASSERT( s && s->my_arena, NULL );
|
|
my_proxy->my_list = &s->my_arena->my_observers;
|
|
}
|
|
my_proxy->my_list->insert(my_proxy);
|
|
// Notify newly activated observer and other pending ones if it belongs to current arena
|
|
if(s && &s->my_arena->my_observers == my_proxy->my_list )
|
|
my_proxy->my_list->notify_entry_observers( s->my_last_local_observer, s->is_worker() );
|
|
} else {
|
|
// Obsolete. Global observer activation
|
|
if( !__TBB_InitOnce::initialization_done() )
|
|
DoOneTimeInitializations();
|
|
my_proxy->my_list = &the_global_observer_list;
|
|
my_proxy->my_list->insert(my_proxy);
|
|
if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) {
|
|
// Notify newly created observer of its own thread.
|
|
// Any other pending observers are notified too.
|
|
the_global_observer_list.notify_entry_observers( s->my_last_global_observer, s->is_worker() );
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Make sure that possible concurrent proxy list cleanup does not conflict
|
|
// with the observer destruction here.
|
|
if ( observer_proxy* proxy = (observer_proxy*)__TBB_FetchAndStoreW(&my_proxy, 0) ) {
|
|
// List destruction should not touch this proxy after we've won the above interlocked exchange.
|
|
__TBB_ASSERT( proxy->my_observer == this, NULL );
|
|
__TBB_ASSERT( is_alive(proxy->my_ref_count), "Observer's proxy died prematurely" );
|
|
__TBB_ASSERT( proxy->my_ref_count >= 1, "reference for observer missing" );
|
|
observer_list &list = *proxy->my_list;
|
|
{
|
|
// Ensure that none of the list walkers relies on observer pointer validity
|
|
observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
|
|
proxy->my_observer = NULL;
|
|
// Proxy may still be held by other threads (to track the last notified observer)
|
|
if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
|
|
list.remove(proxy);
|
|
__TBB_ASSERT( !proxy->my_ref_count, NULL );
|
|
delete proxy;
|
|
}
|
|
}
|
|
while( my_busy_count ) // other threads are still accessing the callback
|
|
__TBB_Yield();
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace internal
|
|
} // namespace tbb
|
|
|
|
#endif /* __TBB_SCHEDULER_OBSERVER */
|