You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/benchmarks/dlib/threads/thread_pool_extension.h

1393 lines
44 KiB
C++

// Copyright (C) 2008 Davis E. King (davis@dlib.net)
// License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_THREAD_POOl_Hh_
#define DLIB_THREAD_POOl_Hh_
#include <exception>
#include <memory>
#include <thread>
#include "thread_pool_extension_abstract.h"
#include "multithreaded_object_extension.h"
#include "../member_function_pointer.h"
#include "../bound_function_pointer.h"
#include "threads_kernel.h"
#include "auto_mutex_extension.h"
#include "../uintn.h"
#include "../array.h"
namespace dlib
{
// ----------------------------------------------------------------------------------------
class thread_pool_implementation;
template <
typename T
>
class future
{
/*!
INITIAL VALUE
- task_id == 0
- tp.get() == 0
CONVENTION
- is_ready() == (tp.get() == 0)
- get() == var
- if (tp.get() != 0)
- tp == a pointer to the thread_pool_implementation that is using this future object
- task_id == the task id of the task in the thread pool tp that is using
this future object.
!*/
public:
future (
) : task_id(0) {}
future (
const T& item
) : task_id(0), var(item) {}
future (
const future& item
) :task_id(0), var(item.get()) {}
~future (
) { wait(); }
future& operator=(
const T& item
) { get() = item; return *this; }
future& operator=(
const future& item
) { get() = item.get(); return *this; }
operator T& (
) { return get(); }
operator const T& (
) const { return get(); }
T& get (
) { wait(); return var; }
const T& get (
) const { wait(); return var; }
bool is_ready (
) const { return tp.get() == 0; }
private:
friend class thread_pool;
inline void wait () const;
mutable uint64 task_id;
mutable std::shared_ptr<thread_pool_implementation> tp;
T var;
};
// ----------------------------------------------------------------------------------------
template <typename T>
inline void swap (
future<T>& a,
future<T>& b
) { dlib::exchange(a.get(), b.get()); }
// Note that dlib::exchange() just calls std::swap. I'm only using it because
// this works around some bugs in certain compilers.
// ----------------------------------------------------------------------------------------
template <typename T> bool operator== (const future<T>& a, const future<T>& b) { return a.get() == b.get(); }
template <typename T> bool operator!= (const future<T>& a, const future<T>& b) { return a.get() != b.get(); }
template <typename T> bool operator<= (const future<T>& a, const future<T>& b) { return a.get() <= b.get(); }
template <typename T> bool operator>= (const future<T>& a, const future<T>& b) { return a.get() >= b.get(); }
template <typename T> bool operator< (const future<T>& a, const future<T>& b) { return a.get() < b.get(); }
template <typename T> bool operator> (const future<T>& a, const future<T>& b) { return a.get() > b.get(); }
template <typename T> bool operator== (const future<T>& a, const T& b) { return a.get() == b; }
template <typename T> bool operator== (const T& a, const future<T>& b) { return a == b.get(); }
template <typename T> bool operator!= (const future<T>& a, const T& b) { return a.get() != b; }
template <typename T> bool operator!= (const T& a, const future<T>& b) { return a != b.get(); }
template <typename T> bool operator<= (const future<T>& a, const T& b) { return a.get() <= b; }
template <typename T> bool operator<= (const T& a, const future<T>& b) { return a <= b.get(); }
template <typename T> bool operator>= (const future<T>& a, const T& b) { return a.get() >= b; }
template <typename T> bool operator>= (const T& a, const future<T>& b) { return a >= b.get(); }
template <typename T> bool operator< (const future<T>& a, const T& b) { return a.get() < b; }
template <typename T> bool operator< (const T& a, const future<T>& b) { return a < b.get(); }
template <typename T> bool operator> (const future<T>& a, const T& b) { return a.get() > b; }
template <typename T> bool operator> (const T& a, const future<T>& b) { return a > b.get(); }
// ----------------------------------------------------------------------------------------
class thread_pool_implementation
{
/*!
CONVENTION
- num_threads_in_pool() == tasks.size()
- if (the destructor has been called) then
- we_are_destructing == true
- else
- we_are_destructing == false
- is_task_thread() == is_worker_thread(get_thread_id())
- m == the mutex used to protect everything in this object
- worker_thread_ids == an array that contains the thread ids for
all the threads in the thread pool
!*/
typedef bound_function_pointer::kernel_1a_c bfp_type;
friend class thread_pool;
explicit thread_pool_implementation (
unsigned long num_threads
);
public:
~thread_pool_implementation(
);
void wait_for_task (
uint64 task_id
) const;
unsigned long num_threads_in_pool (
) const;
void wait_for_all_tasks (
) const;
bool is_task_thread (
) const;
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)()
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
M.unlock();
(obj.*funct)();
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp0.set(obj,funct);
task_ready_signaler.signal();
return tasks[idx].task_id;
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long),
long arg1
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
M.unlock();
(obj.*funct)(arg1);
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp1.set(obj,funct);
tasks[idx].arg1 = arg1;
task_ready_signaler.signal();
return tasks[idx].task_id;
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long,long),
long arg1,
long arg2
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
M.unlock();
(obj.*funct)(arg1, arg2);
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp2.set(obj,funct);
tasks[idx].arg1 = arg1;
tasks[idx].arg2 = arg2;
task_ready_signaler.signal();
return tasks[idx].task_id;
}
struct function_object_copy
{
virtual ~function_object_copy(){}
};
template <typename T>
struct function_object_copy_instance : function_object_copy
{
function_object_copy_instance(const T& item_) : item(item_) {}
T item;
virtual ~function_object_copy_instance(){}
};
uint64 add_task_internal (
const bfp_type& bfp,
std::shared_ptr<function_object_copy>& item
);
/*!
ensures
- adds a task to call the given bfp object.
- swaps item into the internal task object which will have a lifetime
at least as long as the running task.
- returns the task id for this new task
!*/
uint64 add_task_internal (
const bfp_type& bfp
) { std::shared_ptr<function_object_copy> temp; return add_task_internal(bfp, temp); }
/*!
ensures
- adds a task to call the given bfp object.
- returns the task id for this new task
!*/
void shutdown_pool (
);
/*!
ensures
- causes all threads to terminate and blocks the
caller until this happens.
!*/
private:
bool is_worker_thread (
const thread_id_type id
) const;
/*!
requires
- m is locked
ensures
- if (thread with given id is one of the thread pool's worker threads or num_threads_in_pool() == 0) then
- returns true
- else
- returns false
!*/
void thread (
);
/*!
this is the function that executes the threads in the thread pool
!*/
long find_empty_task_slot (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a empty task slot) then
- returns the index of that task slot in tasks
- there is a task slot
- else
- returns -1
!*/
long find_ready_task (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a task to do) then
- returns the index of that task in tasks
- else
- returns -1
!*/
uint64 make_next_task_id (
long idx
);
/*!
requires
- m is locked
- 0 <= idx < tasks.size()
ensures
- returns the next index to be used for tasks that are placed in
tasks[idx]
!*/
unsigned long task_id_to_index (
uint64 id
) const;
/*!
requires
- m is locked
- num_threads_in_pool() != 0
ensures
- returns the index in tasks corresponding to the given id
!*/
struct task_state_type
{
task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0), eptr(nullptr) {}
bool is_ready () const
/*!
ensures
- if (is_empty() == false && no thread is currently processing this task) then
- returns true
- else
- returns false
!*/
{
return !is_being_processed && !is_empty();
}
bool is_empty () const
/*!
ensures
- if (this task state is empty. i.e. it doesn't contain a task to be processed) then
- returns true
- else
- returns false
!*/
{
return task_id == 0;
}
bool is_being_processed; // true when a thread is working on this task
uint64 task_id; // the id of this task. 0 means this task is empty
thread_id_type thread_id; // the id of the thread that requested this task
uint64 next_task_id;
long arg1;
long arg2;
member_function_pointer<> mfp0;
member_function_pointer<long> mfp1;
member_function_pointer<long,long> mfp2;
bfp_type bfp;
std::shared_ptr<function_object_copy> function_copy;
mutable std::exception_ptr eptr; // non-null if the task threw an exception
void propagate_exception() const
{
if (eptr)
{
auto tmp = eptr;
eptr = nullptr;
std::rethrow_exception(tmp);
}
}
};
array<task_state_type> tasks;
array<thread_id_type> worker_thread_ids;
mutex m;
signaler task_done_signaler;
signaler task_ready_signaler;
bool we_are_destructing;
std::vector<std::thread> threads;
// restricted functions
thread_pool_implementation(thread_pool_implementation&); // copy constructor
thread_pool_implementation& operator=(thread_pool_implementation&); // assignment operator
};
// ----------------------------------------------------------------------------------------
class thread_pool
{
/*!
This object is just a shell that holds a std::shared_ptr
to the real thread_pool_implementation object. The reason for doing
it this way is so that we can allow any mixture of destruction orders
between thread_pool objects and futures. Whoever gets destroyed
last cleans up the thread_pool_implementation resources.
!*/
typedef bound_function_pointer::kernel_1a_c bfp_type;
public:
explicit thread_pool (
unsigned long num_threads
)
{
impl.reset(new thread_pool_implementation(num_threads));
}
~thread_pool (
)
{
try
{
impl->shutdown_pool();
}
catch (std::exception& e)
{
std::cerr << "An unhandled exception was inside a dlib::thread_pool when it was destructed." << std::endl;
std::cerr << "It's what string is: \n" << e.what() << std::endl;
using namespace std;
assert(false);
abort();
}
catch (...)
{
std::cerr << "An unhandled exception was inside a dlib::thread_pool when it was destructed." << std::endl;
using namespace std;
assert(false);
abort();
}
}
void wait_for_task (
uint64 task_id
) const { impl->wait_for_task(task_id); }
unsigned long num_threads_in_pool (
) const { return impl->num_threads_in_pool(); }
void wait_for_all_tasks (
) const { impl->wait_for_all_tasks(); }
bool is_task_thread (
) const { return impl->is_task_thread(); }
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)()
)
{
return impl->add_task(obj, funct);
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long),
long arg1
)
{
return impl->add_task(obj, funct, arg1);
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long,long),
long arg1,
long arg2
)
{
return impl->add_task(obj, funct, arg1, arg2);
}
// --------------------
template <typename F>
uint64 add_task (
F& function_object
)
{
COMPILE_TIME_ASSERT(is_function<F>::value == false);
COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false);
bfp_type temp;
temp.set(function_object);
uint64 id = impl->add_task_internal(temp);
return id;
}
template <typename F>
uint64 add_task_by_value (
const F& function_object
)
{
thread_pool_implementation::function_object_copy_instance<F>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item);
uint64 id = impl->add_task_internal(temp, function_copy);
return id;
}
template <typename T>
uint64 add_task (
const T& obj,
void (T::*funct)() const
)
{
bfp_type temp;
temp.set(obj,funct);
uint64 id = impl->add_task_internal(temp);
return id;
}
template <typename T>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)() const
)
{
thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item,funct);
uint64 id = impl->add_task_internal(temp, function_copy);
return id;
}
template <typename T>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)()
)
{
thread_pool_implementation::function_object_copy_instance<T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item,funct);
uint64 id = impl->add_task_internal(temp, function_copy);
return id;
}
uint64 add_task (
void (*funct)()
)
{
bfp_type temp;
temp.set(funct);
uint64 id = impl->add_task_internal(temp);
return id;
}
// --------------------
template <typename F, typename A1>
uint64 add_task (
F& function_object,
future<A1>& arg1
)
{
COMPILE_TIME_ASSERT(is_function<F>::value == false);
COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false);
bfp_type temp;
temp.set(function_object,arg1.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename F, typename A1>
uint64 add_task_by_value (
const F& function_object,
future<A1>& arg1
)
{
thread_pool_implementation::function_object_copy_instance<F>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, arg1.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename T, typename T1, typename A1>
uint64 add_task (
T& obj,
void (T::*funct)(T1),
future<A1>& arg1
)
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename T, typename T1, typename A1>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1),
future<A1>& arg1
)
{
thread_pool_implementation::function_object_copy_instance<T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item,funct,arg1.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename T, typename T1, typename A1>
uint64 add_task (
const T& obj,
void (T::*funct)(T1) const,
future<A1>& arg1
)
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename T, typename T1, typename A1>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1) const,
future<A1>& arg1
)
{
thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item,funct,arg1.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
template <typename T1, typename A1>
uint64 add_task (
void (*funct)(T1),
future<A1>& arg1
)
{
bfp_type temp;
temp.set(funct,arg1.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
return id;
}
// --------------------
template <typename F, typename A1, typename A2>
uint64 add_task (
F& function_object,
future<A1>& arg1,
future<A2>& arg2
)
{
COMPILE_TIME_ASSERT(is_function<F>::value == false);
COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false);
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename F, typename A1, typename A2>
uint64 add_task_by_value (
const F& function_object,
future<A1>& arg1,
future<A2>& arg2
)
{
thread_pool_implementation::function_object_copy_instance<F>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
)
{
thread_pool_implementation::function_object_copy_instance<T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2) const,
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2) const,
future<A1>& arg1,
future<A2>& arg2
)
{
thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
void (*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
return id;
}
// --------------------
template <typename F, typename A1, typename A2, typename A3>
uint64 add_task (
F& function_object,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
COMPILE_TIME_ASSERT(is_function<F>::value == false);
COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false);
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename F, typename A1, typename A2, typename A3>
uint64 add_task_by_value (
const F& function_object,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
thread_pool_implementation::function_object_copy_instance<F>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
thread_pool_implementation::function_object_copy_instance<T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2,T3) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
void (*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
return id;
}
// --------------------
template <typename F, typename A1, typename A2, typename A3, typename A4>
uint64 add_task (
F& function_object,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
COMPILE_TIME_ASSERT(is_function<F>::value == false);
COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false);
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename F, typename A1, typename A2, typename A3, typename A4>
uint64 add_task_by_value (
const F& function_object,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
thread_pool_implementation::function_object_copy_instance<F>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the future to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
thread_pool_implementation::function_object_copy_instance<T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3,T4) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task_by_value (
const T& obj,
void (T::*funct)(T1,T2,T3,T4) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0;
ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj);
std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr);
bfp_type temp;
temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp, function_copy);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
void (*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = impl;
return id;
}
private:
std::shared_ptr<thread_pool_implementation> impl;
// restricted functions
thread_pool(thread_pool&); // copy constructor
thread_pool& operator=(thread_pool&); // assignment operator
};
// ----------------------------------------------------------------------------------------
template <typename T>
void future<T>::
wait (
) const
{
if (tp)
{
tp->wait_for_task(task_id);
tp.reset();
task_id = 0;
}
}
}
// ----------------------------------------------------------------------------------------
#ifdef NO_MAKEFILE
#include "thread_pool_extension.cpp"
#endif
#endif // DLIB_THREAD_POOl_Hh_