You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/dlib/pipe/pipe_kernel_1.h

757 lines
19 KiB
C++

// Copyright (C) 2006 Davis E. King (davis@dlib.net)
// License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_PIPE_KERNEl_1_
#define DLIB_PIPE_KERNEl_1_
#include "../algs.h"
#include "../threads.h"
#include "pipe_kernel_abstract.h"
namespace dlib
{
template <
typename T
>
class pipe
{
/*!
INITIAL VALUE
- pipe_size == 0
- pipe_max_size == defined by constructor
- enabled == true
- data == a pointer to an array of ((pipe_max_size>0)?pipe_max_size:1) T objects.
- dequeue_waiters == 0
- enqueue_waiters == 0
- first == 1
- last == 1
- unblock_sig_waiters == 0
CONVENTION
- size() == pipe_size
- max_size() == pipe_max_size
- is_enabled() == enabled
- m == the mutex used to lock access to all the members of this class
- dequeue_waiters == the number of threads blocked on calls to dequeue()
- enqueue_waiters == the number of threads blocked on calls to enqueue() and
wait_until_empty()
- unblock_sig_waiters == the number of threads blocked on calls to
wait_for_num_blocked_dequeues() and the destructor. (i.e. the number of
blocking calls to unblock_sig.wait())
- dequeue_sig == the signaler that threads blocked on calls to dequeue() wait on
- enqueue_sig == the signaler that threads blocked on calls to enqueue()
or wait_until_empty() wait on.
- unblock_sig == the signaler that is signaled when a thread stops blocking on a call
to enqueue() or dequeue(). It is also signaled when a dequeue that will probably
block is called. The destructor and wait_for_num_blocked_dequeues are the only
things that will wait on this signaler.
- if (pipe_size > 0) then
- data[first] == the next item to dequeue
- data[last] == the item most recently added via enqueue, so the last to dequeue.
- else if (pipe_max_size == 0)
- if (first == 0 && last == 0) then
- data[0] == the next item to dequeue
- else if (first == 0 && last == 1) then
- data[0] has been taken out already by a dequeue
!*/
public:
// this is here for backwards compatibility with older versions of dlib.
typedef pipe kernel_1a;
typedef T type;
explicit pipe (
size_t maximum_size
);
virtual ~pipe (
);
void empty (
);
void wait_until_empty (
) const;
void wait_for_num_blocked_dequeues (
unsigned long num
)const;
void enable (
);
void disable (
);
bool is_enqueue_enabled (
) const;
void disable_enqueue (
);
void enable_enqueue (
);
bool is_dequeue_enabled (
) const;
void disable_dequeue (
);
void enable_dequeue (
);
bool is_enabled (
) const;
size_t max_size (
) const;
size_t size (
) const;
bool enqueue (
T& item
);
bool enqueue (
T&& item
) { return enqueue(item); }
bool dequeue (
T& item
);
bool enqueue_or_timeout (
T& item,
unsigned long timeout
);
bool enqueue_or_timeout (
T&& item,
unsigned long timeout
) { return enqueue_or_timeout(item,timeout); }
bool dequeue_or_timeout (
T& item,
unsigned long timeout
);
private:
size_t pipe_size;
const size_t pipe_max_size;
bool enabled;
T* const data;
size_t first;
size_t last;
mutex m;
signaler dequeue_sig;
signaler enqueue_sig;
signaler unblock_sig;
unsigned long dequeue_waiters;
mutable unsigned long enqueue_waiters;
mutable unsigned long unblock_sig_waiters;
bool enqueue_enabled;
bool dequeue_enabled;
// restricted functions
pipe(const pipe&); // copy constructor
pipe& operator=(const pipe&); // assignment operator
};
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
// member function definitions
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
template <
typename T
>
pipe<T>::
pipe (
size_t maximum_size
) :
pipe_size(0),
pipe_max_size(maximum_size),
enabled(true),
data(new T[(maximum_size>0) ? maximum_size : 1]),
first(1),
last(1),
dequeue_sig(m),
enqueue_sig(m),
unblock_sig(m),
dequeue_waiters(0),
enqueue_waiters(0),
unblock_sig_waiters(0),
enqueue_enabled(true),
dequeue_enabled(true)
{
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
pipe<T>::
~pipe (
)
{
auto_mutex M(m);
++unblock_sig_waiters;
// first make sure no one is blocked on any calls to enqueue() or dequeue()
enabled = false;
dequeue_sig.broadcast();
enqueue_sig.broadcast();
unblock_sig.broadcast();
// wait for all threads to unblock
while (dequeue_waiters > 0 || enqueue_waiters > 0 || unblock_sig_waiters > 1)
unblock_sig.wait();
delete [] data;
--unblock_sig_waiters;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
empty (
)
{
auto_mutex M(m);
pipe_size = 0;
// let any calls to enqueue() know that the pipe is now empty
if (enqueue_waiters > 0)
enqueue_sig.broadcast();
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
wait_until_empty (
) const
{
auto_mutex M(m);
// this function is sort of like a call to enqueue so treat it like that
++enqueue_waiters;
while (pipe_size > 0 && enabled && dequeue_enabled )
enqueue_sig.wait();
// let the destructor know we are ending if it is blocked waiting
if (enabled == false)
unblock_sig.broadcast();
--enqueue_waiters;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
enable (
)
{
auto_mutex M(m);
enabled = true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
disable (
)
{
auto_mutex M(m);
enabled = false;
dequeue_sig.broadcast();
enqueue_sig.broadcast();
unblock_sig.broadcast();
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
is_enabled (
) const
{
auto_mutex M(m);
return enabled;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
size_t pipe<T>::
max_size (
) const
{
auto_mutex M(m);
return pipe_max_size;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
size_t pipe<T>::
size (
) const
{
auto_mutex M(m);
return pipe_size;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
enqueue (
T& item
)
{
auto_mutex M(m);
++enqueue_waiters;
// wait until there is room or we are disabled
while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
!(pipe_max_size == 0 && first == 1) )
enqueue_sig.wait();
if (enabled == false || enqueue_enabled == false)
{
--enqueue_waiters;
// let the destructor know we are unblocking
unblock_sig.broadcast();
return false;
}
// set the appropriate values for first and last
if (pipe_size == 0)
{
first = 0;
last = 0;
}
else
{
last = (last+1)%pipe_max_size;
}
exchange(item,data[last]);
// wake up a call to dequeue() if there are any currently blocked
if (dequeue_waiters > 0)
dequeue_sig.signal();
if (pipe_max_size > 0)
{
++pipe_size;
}
else
{
// wait for a dequeue to take the item out
while (last == 0 && enabled && enqueue_enabled)
enqueue_sig.wait();
if (last == 0 && (enabled == false || enqueue_enabled == false))
{
last = 1;
first = 1;
// no one dequeued this object to put it back into item
exchange(item,data[0]);
--enqueue_waiters;
// let the destructor know we are unblocking
if (unblock_sig_waiters > 0)
unblock_sig.broadcast();
return false;
}
last = 1;
first = 1;
// tell any waiting calls to enqueue() that one of them can proceed
if (enqueue_waiters > 1)
enqueue_sig.broadcast();
// let the destructor know we are unblocking
if (enabled == false && unblock_sig_waiters > 0)
unblock_sig.broadcast();
}
--enqueue_waiters;
return true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
dequeue (
T& item
)
{
auto_mutex M(m);
++dequeue_waiters;
if (pipe_size == 0)
{
// notify wait_for_num_blocked_dequeues()
if (unblock_sig_waiters > 0)
unblock_sig.broadcast();
// notify any blocked enqueue_or_timeout() calls
if (enqueue_waiters > 0)
enqueue_sig.broadcast();
}
// wait until there is something in the pipe or we are disabled
while (pipe_size == 0 && enabled && dequeue_enabled &&
!(pipe_max_size == 0 && first == 0 && last == 0) )
dequeue_sig.wait();
if (enabled == false || dequeue_enabled == false)
{
--dequeue_waiters;
// let the destructor know we are unblocking
unblock_sig.broadcast();
return false;
}
exchange(item,data[first]);
if (pipe_max_size > 0)
{
// set the appropriate values for first
first = (first+1)%pipe_max_size;
--pipe_size;
}
else
{
// let the enqueue waiting on us know that we took the
// item out already.
last = 1;
}
// wake up a call to enqueue() if there are any currently blocked
if (enqueue_waiters > 0)
enqueue_sig.broadcast();
--dequeue_waiters;
return true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
enqueue_or_timeout (
T& item,
unsigned long timeout
)
{
auto_mutex M(m);
++enqueue_waiters;
// wait until there is room or we are disabled or
// we run out of time.
bool timed_out = false;
while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
!(pipe_max_size == 0 && dequeue_waiters > 0 && first == 1) )
{
if (timeout == 0 || enqueue_sig.wait_or_timeout(timeout) == false)
{
timed_out = true;
break;
}
}
if (enabled == false || timed_out || enqueue_enabled == false)
{
--enqueue_waiters;
// let the destructor know we are unblocking
unblock_sig.broadcast();
return false;
}
// set the appropriate values for first and last
if (pipe_size == 0)
{
first = 0;
last = 0;
}
else
{
last = (last+1)%pipe_max_size;
}
exchange(item,data[last]);
// wake up a call to dequeue() if there are any currently blocked
if (dequeue_waiters > 0)
dequeue_sig.signal();
if (pipe_max_size > 0)
{
++pipe_size;
}
else
{
// wait for a dequeue to take the item out
while (last == 0 && enabled && enqueue_enabled)
enqueue_sig.wait();
if (last == 0 && (enabled == false || enqueue_enabled == false))
{
last = 1;
first = 1;
// no one dequeued this object to put it back into item
exchange(item,data[0]);
--enqueue_waiters;
// let the destructor know we are unblocking
if (unblock_sig_waiters > 0)
unblock_sig.broadcast();
return false;
}
last = 1;
first = 1;
// tell any waiting calls to enqueue() that one of them can proceed
if (enqueue_waiters > 1)
enqueue_sig.broadcast();
// let the destructor know we are unblocking
if (enabled == false && unblock_sig_waiters > 0)
unblock_sig.broadcast();
}
--enqueue_waiters;
return true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
dequeue_or_timeout (
T& item,
unsigned long timeout
)
{
auto_mutex M(m);
++dequeue_waiters;
if (pipe_size == 0)
{
// notify wait_for_num_blocked_dequeues()
if (unblock_sig_waiters > 0)
unblock_sig.broadcast();
// notify any blocked enqueue_or_timeout() calls
if (enqueue_waiters > 0)
enqueue_sig.broadcast();
}
bool timed_out = false;
// wait until there is something in the pipe or we are disabled or we timeout.
while (pipe_size == 0 && enabled && dequeue_enabled &&
!(pipe_max_size == 0 && first == 0 && last == 0) )
{
if (timeout == 0 || dequeue_sig.wait_or_timeout(timeout) == false)
{
timed_out = true;
break;
}
}
if (enabled == false || timed_out || dequeue_enabled == false)
{
--dequeue_waiters;
// let the destructor know we are unblocking
unblock_sig.broadcast();
return false;
}
exchange(item,data[first]);
if (pipe_max_size > 0)
{
// set the appropriate values for first
first = (first+1)%pipe_max_size;
--pipe_size;
}
else
{
// let the enqueue waiting on us know that we took the
// item out already.
last = 1;
}
// wake up a call to enqueue() if there are any currently blocked
if (enqueue_waiters > 0)
enqueue_sig.broadcast();
--dequeue_waiters;
return true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
wait_for_num_blocked_dequeues (
unsigned long num
)const
{
auto_mutex M(m);
++unblock_sig_waiters;
while ( (dequeue_waiters < num || pipe_size != 0) && enabled && dequeue_enabled)
unblock_sig.wait();
// let the destructor know we are ending if it is blocked waiting
if (enabled == false)
unblock_sig.broadcast();
--unblock_sig_waiters;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
is_enqueue_enabled (
) const
{
auto_mutex M(m);
return enqueue_enabled;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
disable_enqueue (
)
{
auto_mutex M(m);
enqueue_enabled = false;
enqueue_sig.broadcast();
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
enable_enqueue (
)
{
auto_mutex M(m);
enqueue_enabled = true;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
bool pipe<T>::
is_dequeue_enabled (
) const
{
auto_mutex M(m);
return dequeue_enabled;
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
disable_dequeue (
)
{
auto_mutex M(m);
dequeue_enabled = false;
dequeue_sig.broadcast();
}
// ----------------------------------------------------------------------------------------
template <
typename T
>
void pipe<T>::
enable_dequeue (
)
{
auto_mutex M(m);
dequeue_enabled = true;
}
// ----------------------------------------------------------------------------------------
}
#endif // DLIB_PIPE_KERNEl_1_