You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/tbb/concurrent_monitor.h

242 lines
8.7 KiB
C++

/*
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks. Threading Building Blocks is free software;
you can redistribute it and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation. Threading Building Blocks is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details. You should have received a copy of
the GNU General Public License along with Threading Building Blocks; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software library without
restriction. Specifically, if other files instantiate templates or use macros or inline
functions from this file, or you compile this file and link it with other files to produce
an executable, this file does not by itself cause the resulting executable to be covered
by the GNU General Public License. This exception does not however invalidate any other
reasons why the executable file might be covered by the GNU General Public License.
*/
#ifndef __TBB_concurrent_monitor_H
#define __TBB_concurrent_monitor_H
#include "tbb/tbb_stddef.h"
#include "tbb/atomic.h"
#include "tbb/spin_mutex.h"
#include "tbb/tbb_exception.h"
#include "tbb/aligned_space.h"
#include "semaphore.h"
namespace tbb {
namespace internal {
//! Circular doubly-linked list with sentinel
/** head.next points to the front and head.prev points to the back */
class circular_doubly_linked_list_with_sentinel : no_copy {
public:
struct node_t {
node_t* next;
node_t* prev;
explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
};
// ctor
circular_doubly_linked_list_with_sentinel() {clear();}
// dtor
~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
inline size_t size() const {return count;}
inline bool empty() const {return size()==0;}
inline node_t* front() const {return head.next;}
inline node_t* last() const {return head.prev;}
inline node_t* begin() const {return front();}
inline const node_t* end() const {return &head;}
//! add to the back of the list
inline void add( node_t* n ) {
__TBB_store_relaxed(count, __TBB_load_relaxed(count) + 1);
n->prev = head.prev;
n->next = &head;
head.prev->next = n;
head.prev = n;
}
//! remove node 'n'
inline void remove( node_t& n ) {
__TBB_store_relaxed(count, __TBB_load_relaxed(count) - 1);
n.prev->next = n.next;
n.next->prev = n.prev;
}
//! move all elements to 'lst' and initialize the 'this' list
inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
if( const size_t l_count = __TBB_load_relaxed(count) ) {
__TBB_store_relaxed(lst.count, l_count);
lst.head.next = head.next;
lst.head.prev = head.prev;
head.next->prev = &lst.head;
head.prev->next = &lst.head;
clear();
}
}
void clear() {head.next = head.prev = &head; __TBB_store_relaxed(count, 0);}
private:
__TBB_atomic size_t count;
node_t head;
};
typedef circular_doubly_linked_list_with_sentinel waitset_t;
typedef circular_doubly_linked_list_with_sentinel dllist_t;
typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
//! concurrent_monitor
/** fine-grained concurrent_monitor implementation */
class concurrent_monitor : no_copy {
public:
/** per-thread descriptor for concurrent_monitor */
class thread_context : waitset_node_t, no_copy {
friend class concurrent_monitor;
public:
thread_context() : spurious(false), aborted(false), ready(false), context(0) {
epoch = 0;
in_waitset = false;
}
~thread_context() {
if (ready) {
if( spurious ) semaphore().P();
semaphore().~binary_semaphore();
}
}
binary_semaphore& semaphore() { return *sema.begin(); }
private:
//! The method for lazy initialization of the thread_context's semaphore.
// Inlining of the method is undesirable, due to extra instructions for
// exception support added at caller side.
__TBB_NOINLINE( void init() );
tbb::aligned_space<binary_semaphore> sema;
__TBB_atomic unsigned epoch;
tbb::atomic<bool> in_waitset;
bool spurious;
bool aborted;
bool ready;
uintptr_t context;
};
//! ctor
concurrent_monitor() {__TBB_store_relaxed(epoch, 0);}
//! dtor
~concurrent_monitor() ;
//! prepare wait by inserting 'thr' into the wait queue
void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
//! Commit wait if event count has not changed; otherwise, cancel wait.
/** Returns true if committed, false if canceled. */
inline bool commit_wait( thread_context& thr ) {
const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
// this check is just an optimization
if( do_it ) {
__TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
thr.semaphore().P();
__TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
if( thr.aborted )
throw_exception( eid_user_abort );
} else {
cancel_wait( thr );
}
return do_it;
}
//! Cancel the wait. Removes the thread from the wait queue if not removed yet.
void cancel_wait( thread_context& thr );
//! Wait for a condition to be satisfied with waiting-on context
template<typename WaitUntil, typename Context>
void wait( WaitUntil until, Context on );
//! Notify one thread about the event
void notify_one() {atomic_fence(); notify_one_relaxed();}
//! Notify one thread about the event. Relaxed version.
void notify_one_relaxed();
//! Notify all waiting threads of the event
void notify_all() {atomic_fence(); notify_all_relaxed();}
//! Notify all waiting threads of the event; Relaxed version
void notify_all_relaxed();
//! Notify waiting threads of the event that satisfies the given predicate
template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
//! Notify waiting threads of the event that satisfies the given predicate; Relaxed version
template<typename P> void notify_relaxed( const P& predicate );
//! Abort any sleeping threads at the time of the call
void abort_all() {atomic_fence(); abort_all_relaxed(); }
//! Abort any sleeping threads at the time of the call; Relaxed version
void abort_all_relaxed();
private:
tbb::spin_mutex mutex_ec;
waitset_t waitset_ec;
__TBB_atomic unsigned epoch;
thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
};
template<typename WaitUntil, typename Context>
void concurrent_monitor::wait( WaitUntil until, Context on )
{
bool slept = false;
thread_context thr_ctx;
prepare_wait( thr_ctx, on() );
while( !until() ) {
if( (slept = commit_wait( thr_ctx ) )==true )
if( until() ) break;
slept = false;
prepare_wait( thr_ctx, on() );
}
if( !slept )
cancel_wait( thr_ctx );
}
template<typename P>
void concurrent_monitor::notify_relaxed( const P& predicate ) {
if( waitset_ec.empty() )
return;
dllist_t temp;
waitset_node_t* nxt;
const waitset_node_t* end = waitset_ec.end();
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
nxt = n->prev;
thread_context* thr = to_thread_context( n );
if( predicate( thr->context ) ) {
waitset_ec.remove( *n );
thr->in_waitset = false;
temp.add( n );
}
}
}
end = temp.end();
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
nxt = n->next;
to_thread_context(n)->semaphore().V();
}
#if TBB_USE_ASSERT
temp.clear();
#endif
}
} // namespace internal
} // namespace tbb
#endif /* __TBB_concurrent_monitor_H */