You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
8.7 KiB
C++
242 lines
8.7 KiB
C++
/*
|
|
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
|
|
|
|
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
|
|
you can redistribute it and/or modify it under the terms of the GNU General Public License
|
|
version 2 as published by the Free Software Foundation. Threading Building Blocks is
|
|
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
|
|
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
See the GNU General Public License for more details. You should have received a copy of
|
|
the GNU General Public License along with Threading Building Blocks; if not, write to the
|
|
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
As a special exception, you may use this file as part of a free software library without
|
|
restriction. Specifically, if other files instantiate templates or use macros or inline
|
|
functions from this file, or you compile this file and link it with other files to produce
|
|
an executable, this file does not by itself cause the resulting executable to be covered
|
|
by the GNU General Public License. This exception does not however invalidate any other
|
|
reasons why the executable file might be covered by the GNU General Public License.
|
|
*/
|
|
|
|
#ifndef __TBB_concurrent_monitor_H
|
|
#define __TBB_concurrent_monitor_H
|
|
|
|
#include "tbb/tbb_stddef.h"
|
|
#include "tbb/atomic.h"
|
|
#include "tbb/spin_mutex.h"
|
|
#include "tbb/tbb_exception.h"
|
|
#include "tbb/aligned_space.h"
|
|
|
|
#include "semaphore.h"
|
|
|
|
namespace tbb {
|
|
namespace internal {
|
|
|
|
//! Circular doubly-linked list with sentinel
|
|
/** head.next points to the front and head.prev points to the back */
|
|
class circular_doubly_linked_list_with_sentinel : no_copy {
|
|
public:
|
|
struct node_t {
|
|
node_t* next;
|
|
node_t* prev;
|
|
explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
|
|
};
|
|
|
|
// ctor
|
|
circular_doubly_linked_list_with_sentinel() {clear();}
|
|
// dtor
|
|
~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
|
|
|
|
inline size_t size() const {return count;}
|
|
inline bool empty() const {return size()==0;}
|
|
inline node_t* front() const {return head.next;}
|
|
inline node_t* last() const {return head.prev;}
|
|
inline node_t* begin() const {return front();}
|
|
inline const node_t* end() const {return &head;}
|
|
|
|
//! add to the back of the list
|
|
inline void add( node_t* n ) {
|
|
__TBB_store_relaxed(count, __TBB_load_relaxed(count) + 1);
|
|
n->prev = head.prev;
|
|
n->next = &head;
|
|
head.prev->next = n;
|
|
head.prev = n;
|
|
}
|
|
|
|
//! remove node 'n'
|
|
inline void remove( node_t& n ) {
|
|
__TBB_store_relaxed(count, __TBB_load_relaxed(count) - 1);
|
|
n.prev->next = n.next;
|
|
n.next->prev = n.prev;
|
|
}
|
|
|
|
//! move all elements to 'lst' and initialize the 'this' list
|
|
inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
|
|
if( const size_t l_count = __TBB_load_relaxed(count) ) {
|
|
__TBB_store_relaxed(lst.count, l_count);
|
|
lst.head.next = head.next;
|
|
lst.head.prev = head.prev;
|
|
head.next->prev = &lst.head;
|
|
head.prev->next = &lst.head;
|
|
clear();
|
|
}
|
|
}
|
|
|
|
void clear() {head.next = head.prev = &head; __TBB_store_relaxed(count, 0);}
|
|
private:
|
|
__TBB_atomic size_t count;
|
|
node_t head;
|
|
};
|
|
|
|
typedef circular_doubly_linked_list_with_sentinel waitset_t;
|
|
typedef circular_doubly_linked_list_with_sentinel dllist_t;
|
|
typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
|
|
|
|
//! concurrent_monitor
|
|
/** fine-grained concurrent_monitor implementation */
|
|
class concurrent_monitor : no_copy {
|
|
public:
|
|
/** per-thread descriptor for concurrent_monitor */
|
|
class thread_context : waitset_node_t, no_copy {
|
|
friend class concurrent_monitor;
|
|
public:
|
|
thread_context() : spurious(false), aborted(false), ready(false), context(0) {
|
|
epoch = 0;
|
|
in_waitset = false;
|
|
}
|
|
~thread_context() {
|
|
if (ready) {
|
|
if( spurious ) semaphore().P();
|
|
semaphore().~binary_semaphore();
|
|
}
|
|
}
|
|
binary_semaphore& semaphore() { return *sema.begin(); }
|
|
private:
|
|
//! The method for lazy initialization of the thread_context's semaphore.
|
|
// Inlining of the method is undesirable, due to extra instructions for
|
|
// exception support added at caller side.
|
|
__TBB_NOINLINE( void init() );
|
|
tbb::aligned_space<binary_semaphore> sema;
|
|
__TBB_atomic unsigned epoch;
|
|
tbb::atomic<bool> in_waitset;
|
|
bool spurious;
|
|
bool aborted;
|
|
bool ready;
|
|
uintptr_t context;
|
|
};
|
|
|
|
//! ctor
|
|
concurrent_monitor() {__TBB_store_relaxed(epoch, 0);}
|
|
|
|
//! dtor
|
|
~concurrent_monitor() ;
|
|
|
|
//! prepare wait by inserting 'thr' into the wait queue
|
|
void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
|
|
|
|
//! Commit wait if event count has not changed; otherwise, cancel wait.
|
|
/** Returns true if committed, false if canceled. */
|
|
inline bool commit_wait( thread_context& thr ) {
|
|
const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
|
|
// this check is just an optimization
|
|
if( do_it ) {
|
|
__TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
|
|
thr.semaphore().P();
|
|
__TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
|
|
if( thr.aborted )
|
|
throw_exception( eid_user_abort );
|
|
} else {
|
|
cancel_wait( thr );
|
|
}
|
|
return do_it;
|
|
}
|
|
//! Cancel the wait. Removes the thread from the wait queue if not removed yet.
|
|
void cancel_wait( thread_context& thr );
|
|
|
|
//! Wait for a condition to be satisfied with waiting-on context
|
|
template<typename WaitUntil, typename Context>
|
|
void wait( WaitUntil until, Context on );
|
|
|
|
//! Notify one thread about the event
|
|
void notify_one() {atomic_fence(); notify_one_relaxed();}
|
|
|
|
//! Notify one thread about the event. Relaxed version.
|
|
void notify_one_relaxed();
|
|
|
|
//! Notify all waiting threads of the event
|
|
void notify_all() {atomic_fence(); notify_all_relaxed();}
|
|
|
|
//! Notify all waiting threads of the event; Relaxed version
|
|
void notify_all_relaxed();
|
|
|
|
//! Notify waiting threads of the event that satisfies the given predicate
|
|
template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
|
|
|
|
//! Notify waiting threads of the event that satisfies the given predicate; Relaxed version
|
|
template<typename P> void notify_relaxed( const P& predicate );
|
|
|
|
//! Abort any sleeping threads at the time of the call
|
|
void abort_all() {atomic_fence(); abort_all_relaxed(); }
|
|
|
|
//! Abort any sleeping threads at the time of the call; Relaxed version
|
|
void abort_all_relaxed();
|
|
|
|
private:
|
|
tbb::spin_mutex mutex_ec;
|
|
waitset_t waitset_ec;
|
|
__TBB_atomic unsigned epoch;
|
|
thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
|
|
};
|
|
|
|
template<typename WaitUntil, typename Context>
|
|
void concurrent_monitor::wait( WaitUntil until, Context on )
|
|
{
|
|
bool slept = false;
|
|
thread_context thr_ctx;
|
|
prepare_wait( thr_ctx, on() );
|
|
while( !until() ) {
|
|
if( (slept = commit_wait( thr_ctx ) )==true )
|
|
if( until() ) break;
|
|
slept = false;
|
|
prepare_wait( thr_ctx, on() );
|
|
}
|
|
if( !slept )
|
|
cancel_wait( thr_ctx );
|
|
}
|
|
|
|
template<typename P>
|
|
void concurrent_monitor::notify_relaxed( const P& predicate ) {
|
|
if( waitset_ec.empty() )
|
|
return;
|
|
dllist_t temp;
|
|
waitset_node_t* nxt;
|
|
const waitset_node_t* end = waitset_ec.end();
|
|
{
|
|
tbb::spin_mutex::scoped_lock l( mutex_ec );
|
|
__TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
|
|
for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
|
|
nxt = n->prev;
|
|
thread_context* thr = to_thread_context( n );
|
|
if( predicate( thr->context ) ) {
|
|
waitset_ec.remove( *n );
|
|
thr->in_waitset = false;
|
|
temp.add( n );
|
|
}
|
|
}
|
|
}
|
|
|
|
end = temp.end();
|
|
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
|
|
nxt = n->next;
|
|
to_thread_context(n)->semaphore().V();
|
|
}
|
|
#if TBB_USE_ASSERT
|
|
temp.clear();
|
|
#endif
|
|
}
|
|
|
|
} // namespace internal
|
|
} // namespace tbb
|
|
|
|
#endif /* __TBB_concurrent_monitor_H */
|