You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
757 lines
19 KiB
C++
757 lines
19 KiB
C++
// Copyright (C) 2006 Davis E. King (davis@dlib.net)
|
|
// License: Boost Software License See LICENSE.txt for the full license.
|
|
#ifndef DLIB_PIPE_KERNEl_1_
|
|
#define DLIB_PIPE_KERNEl_1_
|
|
|
|
#include "../algs.h"
|
|
#include "../threads.h"
|
|
#include "pipe_kernel_abstract.h"
|
|
|
|
namespace dlib
|
|
{
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
class pipe
|
|
{
|
|
/*!
|
|
INITIAL VALUE
|
|
- pipe_size == 0
|
|
- pipe_max_size == defined by constructor
|
|
- enabled == true
|
|
- data == a pointer to an array of ((pipe_max_size>0)?pipe_max_size:1) T objects.
|
|
- dequeue_waiters == 0
|
|
- enqueue_waiters == 0
|
|
- first == 1
|
|
- last == 1
|
|
- unblock_sig_waiters == 0
|
|
|
|
CONVENTION
|
|
- size() == pipe_size
|
|
- max_size() == pipe_max_size
|
|
- is_enabled() == enabled
|
|
|
|
- m == the mutex used to lock access to all the members of this class
|
|
|
|
- dequeue_waiters == the number of threads blocked on calls to dequeue()
|
|
- enqueue_waiters == the number of threads blocked on calls to enqueue() and
|
|
wait_until_empty()
|
|
- unblock_sig_waiters == the number of threads blocked on calls to
|
|
wait_for_num_blocked_dequeues() and the destructor. (i.e. the number of
|
|
blocking calls to unblock_sig.wait())
|
|
|
|
- dequeue_sig == the signaler that threads blocked on calls to dequeue() wait on
|
|
- enqueue_sig == the signaler that threads blocked on calls to enqueue()
|
|
or wait_until_empty() wait on.
|
|
- unblock_sig == the signaler that is signaled when a thread stops blocking on a call
|
|
to enqueue() or dequeue(). It is also signaled when a dequeue that will probably
|
|
block is called. The destructor and wait_for_num_blocked_dequeues are the only
|
|
things that will wait on this signaler.
|
|
|
|
- if (pipe_size > 0) then
|
|
- data[first] == the next item to dequeue
|
|
- data[last] == the item most recently added via enqueue, so the last to dequeue.
|
|
- else if (pipe_max_size == 0)
|
|
- if (first == 0 && last == 0) then
|
|
- data[0] == the next item to dequeue
|
|
- else if (first == 0 && last == 1) then
|
|
- data[0] has been taken out already by a dequeue
|
|
!*/
|
|
|
|
public:
|
|
// this is here for backwards compatibility with older versions of dlib.
|
|
typedef pipe kernel_1a;
|
|
|
|
typedef T type;
|
|
|
|
explicit pipe (
|
|
size_t maximum_size
|
|
);
|
|
|
|
virtual ~pipe (
|
|
);
|
|
|
|
void empty (
|
|
);
|
|
|
|
void wait_until_empty (
|
|
) const;
|
|
|
|
void wait_for_num_blocked_dequeues (
|
|
unsigned long num
|
|
)const;
|
|
|
|
void enable (
|
|
);
|
|
|
|
void disable (
|
|
);
|
|
|
|
bool is_enqueue_enabled (
|
|
) const;
|
|
|
|
void disable_enqueue (
|
|
);
|
|
|
|
void enable_enqueue (
|
|
);
|
|
|
|
bool is_dequeue_enabled (
|
|
) const;
|
|
|
|
void disable_dequeue (
|
|
);
|
|
|
|
void enable_dequeue (
|
|
);
|
|
|
|
bool is_enabled (
|
|
) const;
|
|
|
|
size_t max_size (
|
|
) const;
|
|
|
|
size_t size (
|
|
) const;
|
|
|
|
bool enqueue (
|
|
T& item
|
|
);
|
|
|
|
bool enqueue (
|
|
T&& item
|
|
) { return enqueue(item); }
|
|
|
|
bool dequeue (
|
|
T& item
|
|
);
|
|
|
|
bool enqueue_or_timeout (
|
|
T& item,
|
|
unsigned long timeout
|
|
);
|
|
|
|
bool enqueue_or_timeout (
|
|
T&& item,
|
|
unsigned long timeout
|
|
) { return enqueue_or_timeout(item,timeout); }
|
|
|
|
bool dequeue_or_timeout (
|
|
T& item,
|
|
unsigned long timeout
|
|
);
|
|
|
|
private:
|
|
|
|
size_t pipe_size;
|
|
const size_t pipe_max_size;
|
|
bool enabled;
|
|
|
|
T* const data;
|
|
|
|
size_t first;
|
|
size_t last;
|
|
|
|
mutex m;
|
|
signaler dequeue_sig;
|
|
signaler enqueue_sig;
|
|
signaler unblock_sig;
|
|
|
|
unsigned long dequeue_waiters;
|
|
mutable unsigned long enqueue_waiters;
|
|
mutable unsigned long unblock_sig_waiters;
|
|
bool enqueue_enabled;
|
|
bool dequeue_enabled;
|
|
|
|
// restricted functions
|
|
pipe(const pipe&); // copy constructor
|
|
pipe& operator=(const pipe&); // assignment operator
|
|
|
|
};
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------------
|
|
// member function definitions
|
|
// ----------------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
pipe<T>::
|
|
pipe (
|
|
size_t maximum_size
|
|
) :
|
|
pipe_size(0),
|
|
pipe_max_size(maximum_size),
|
|
enabled(true),
|
|
data(new T[(maximum_size>0) ? maximum_size : 1]),
|
|
first(1),
|
|
last(1),
|
|
dequeue_sig(m),
|
|
enqueue_sig(m),
|
|
unblock_sig(m),
|
|
dequeue_waiters(0),
|
|
enqueue_waiters(0),
|
|
unblock_sig_waiters(0),
|
|
enqueue_enabled(true),
|
|
dequeue_enabled(true)
|
|
{
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
pipe<T>::
|
|
~pipe (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++unblock_sig_waiters;
|
|
|
|
// first make sure no one is blocked on any calls to enqueue() or dequeue()
|
|
enabled = false;
|
|
dequeue_sig.broadcast();
|
|
enqueue_sig.broadcast();
|
|
unblock_sig.broadcast();
|
|
|
|
// wait for all threads to unblock
|
|
while (dequeue_waiters > 0 || enqueue_waiters > 0 || unblock_sig_waiters > 1)
|
|
unblock_sig.wait();
|
|
|
|
delete [] data;
|
|
--unblock_sig_waiters;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
empty (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
pipe_size = 0;
|
|
|
|
// let any calls to enqueue() know that the pipe is now empty
|
|
if (enqueue_waiters > 0)
|
|
enqueue_sig.broadcast();
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
wait_until_empty (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
// this function is sort of like a call to enqueue so treat it like that
|
|
++enqueue_waiters;
|
|
|
|
while (pipe_size > 0 && enabled && dequeue_enabled )
|
|
enqueue_sig.wait();
|
|
|
|
// let the destructor know we are ending if it is blocked waiting
|
|
if (enabled == false)
|
|
unblock_sig.broadcast();
|
|
|
|
--enqueue_waiters;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
enable (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
enabled = true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
disable (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
enabled = false;
|
|
dequeue_sig.broadcast();
|
|
enqueue_sig.broadcast();
|
|
unblock_sig.broadcast();
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
is_enabled (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
return enabled;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
size_t pipe<T>::
|
|
max_size (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
return pipe_max_size;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
size_t pipe<T>::
|
|
size (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
return pipe_size;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
enqueue (
|
|
T& item
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++enqueue_waiters;
|
|
|
|
// wait until there is room or we are disabled
|
|
while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
|
|
!(pipe_max_size == 0 && first == 1) )
|
|
enqueue_sig.wait();
|
|
|
|
if (enabled == false || enqueue_enabled == false)
|
|
{
|
|
--enqueue_waiters;
|
|
// let the destructor know we are unblocking
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
// set the appropriate values for first and last
|
|
if (pipe_size == 0)
|
|
{
|
|
first = 0;
|
|
last = 0;
|
|
}
|
|
else
|
|
{
|
|
last = (last+1)%pipe_max_size;
|
|
}
|
|
|
|
|
|
exchange(item,data[last]);
|
|
|
|
// wake up a call to dequeue() if there are any currently blocked
|
|
if (dequeue_waiters > 0)
|
|
dequeue_sig.signal();
|
|
|
|
if (pipe_max_size > 0)
|
|
{
|
|
++pipe_size;
|
|
}
|
|
else
|
|
{
|
|
// wait for a dequeue to take the item out
|
|
while (last == 0 && enabled && enqueue_enabled)
|
|
enqueue_sig.wait();
|
|
|
|
if (last == 0 && (enabled == false || enqueue_enabled == false))
|
|
{
|
|
last = 1;
|
|
first = 1;
|
|
|
|
// no one dequeued this object to put it back into item
|
|
exchange(item,data[0]);
|
|
|
|
--enqueue_waiters;
|
|
// let the destructor know we are unblocking
|
|
if (unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
last = 1;
|
|
first = 1;
|
|
|
|
// tell any waiting calls to enqueue() that one of them can proceed
|
|
if (enqueue_waiters > 1)
|
|
enqueue_sig.broadcast();
|
|
|
|
// let the destructor know we are unblocking
|
|
if (enabled == false && unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
}
|
|
|
|
--enqueue_waiters;
|
|
return true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
dequeue (
|
|
T& item
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++dequeue_waiters;
|
|
|
|
if (pipe_size == 0)
|
|
{
|
|
// notify wait_for_num_blocked_dequeues()
|
|
if (unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
|
|
// notify any blocked enqueue_or_timeout() calls
|
|
if (enqueue_waiters > 0)
|
|
enqueue_sig.broadcast();
|
|
}
|
|
|
|
// wait until there is something in the pipe or we are disabled
|
|
while (pipe_size == 0 && enabled && dequeue_enabled &&
|
|
!(pipe_max_size == 0 && first == 0 && last == 0) )
|
|
dequeue_sig.wait();
|
|
|
|
if (enabled == false || dequeue_enabled == false)
|
|
{
|
|
--dequeue_waiters;
|
|
// let the destructor know we are unblocking
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
exchange(item,data[first]);
|
|
|
|
if (pipe_max_size > 0)
|
|
{
|
|
// set the appropriate values for first
|
|
first = (first+1)%pipe_max_size;
|
|
|
|
--pipe_size;
|
|
}
|
|
else
|
|
{
|
|
// let the enqueue waiting on us know that we took the
|
|
// item out already.
|
|
last = 1;
|
|
}
|
|
|
|
// wake up a call to enqueue() if there are any currently blocked
|
|
if (enqueue_waiters > 0)
|
|
enqueue_sig.broadcast();
|
|
|
|
--dequeue_waiters;
|
|
return true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
enqueue_or_timeout (
|
|
T& item,
|
|
unsigned long timeout
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++enqueue_waiters;
|
|
|
|
// wait until there is room or we are disabled or
|
|
// we run out of time.
|
|
bool timed_out = false;
|
|
while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
|
|
!(pipe_max_size == 0 && dequeue_waiters > 0 && first == 1) )
|
|
{
|
|
if (timeout == 0 || enqueue_sig.wait_or_timeout(timeout) == false)
|
|
{
|
|
timed_out = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (enabled == false || timed_out || enqueue_enabled == false)
|
|
{
|
|
--enqueue_waiters;
|
|
// let the destructor know we are unblocking
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
// set the appropriate values for first and last
|
|
if (pipe_size == 0)
|
|
{
|
|
first = 0;
|
|
last = 0;
|
|
}
|
|
else
|
|
{
|
|
last = (last+1)%pipe_max_size;
|
|
}
|
|
|
|
|
|
exchange(item,data[last]);
|
|
|
|
// wake up a call to dequeue() if there are any currently blocked
|
|
if (dequeue_waiters > 0)
|
|
dequeue_sig.signal();
|
|
|
|
if (pipe_max_size > 0)
|
|
{
|
|
++pipe_size;
|
|
}
|
|
else
|
|
{
|
|
// wait for a dequeue to take the item out
|
|
while (last == 0 && enabled && enqueue_enabled)
|
|
enqueue_sig.wait();
|
|
|
|
if (last == 0 && (enabled == false || enqueue_enabled == false))
|
|
{
|
|
last = 1;
|
|
first = 1;
|
|
|
|
// no one dequeued this object to put it back into item
|
|
exchange(item,data[0]);
|
|
|
|
--enqueue_waiters;
|
|
// let the destructor know we are unblocking
|
|
if (unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
last = 1;
|
|
first = 1;
|
|
|
|
// tell any waiting calls to enqueue() that one of them can proceed
|
|
if (enqueue_waiters > 1)
|
|
enqueue_sig.broadcast();
|
|
|
|
// let the destructor know we are unblocking
|
|
if (enabled == false && unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
}
|
|
|
|
--enqueue_waiters;
|
|
return true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
dequeue_or_timeout (
|
|
T& item,
|
|
unsigned long timeout
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++dequeue_waiters;
|
|
|
|
if (pipe_size == 0)
|
|
{
|
|
// notify wait_for_num_blocked_dequeues()
|
|
if (unblock_sig_waiters > 0)
|
|
unblock_sig.broadcast();
|
|
|
|
// notify any blocked enqueue_or_timeout() calls
|
|
if (enqueue_waiters > 0)
|
|
enqueue_sig.broadcast();
|
|
}
|
|
|
|
bool timed_out = false;
|
|
// wait until there is something in the pipe or we are disabled or we timeout.
|
|
while (pipe_size == 0 && enabled && dequeue_enabled &&
|
|
!(pipe_max_size == 0 && first == 0 && last == 0) )
|
|
{
|
|
if (timeout == 0 || dequeue_sig.wait_or_timeout(timeout) == false)
|
|
{
|
|
timed_out = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (enabled == false || timed_out || dequeue_enabled == false)
|
|
{
|
|
--dequeue_waiters;
|
|
// let the destructor know we are unblocking
|
|
unblock_sig.broadcast();
|
|
return false;
|
|
}
|
|
|
|
exchange(item,data[first]);
|
|
|
|
if (pipe_max_size > 0)
|
|
{
|
|
// set the appropriate values for first
|
|
first = (first+1)%pipe_max_size;
|
|
|
|
--pipe_size;
|
|
}
|
|
else
|
|
{
|
|
// let the enqueue waiting on us know that we took the
|
|
// item out already.
|
|
last = 1;
|
|
}
|
|
|
|
// wake up a call to enqueue() if there are any currently blocked
|
|
if (enqueue_waiters > 0)
|
|
enqueue_sig.broadcast();
|
|
|
|
--dequeue_waiters;
|
|
return true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
wait_for_num_blocked_dequeues (
|
|
unsigned long num
|
|
)const
|
|
{
|
|
auto_mutex M(m);
|
|
++unblock_sig_waiters;
|
|
|
|
while ( (dequeue_waiters < num || pipe_size != 0) && enabled && dequeue_enabled)
|
|
unblock_sig.wait();
|
|
|
|
// let the destructor know we are ending if it is blocked waiting
|
|
if (enabled == false)
|
|
unblock_sig.broadcast();
|
|
|
|
--unblock_sig_waiters;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
is_enqueue_enabled (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
return enqueue_enabled;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
disable_enqueue (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
enqueue_enabled = false;
|
|
enqueue_sig.broadcast();
|
|
}
|
|
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
enable_enqueue (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
enqueue_enabled = true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
bool pipe<T>::
|
|
is_dequeue_enabled (
|
|
) const
|
|
{
|
|
auto_mutex M(m);
|
|
return dequeue_enabled;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
disable_dequeue (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
dequeue_enabled = false;
|
|
dequeue_sig.broadcast();
|
|
}
|
|
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename T
|
|
>
|
|
void pipe<T>::
|
|
enable_dequeue (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
dequeue_enabled = true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
}
|
|
|
|
#endif // DLIB_PIPE_KERNEl_1_
|
|
|