You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/samples.md

376 lines
9.8 KiB
Markdown

# Samples for moodycamel::ConcurrentQueue
Here are some example usage scenarios with sample code. Note that most
use the simplest version of each available method for demonstration purposes,
but they can all be adapted to use tokens and/or the corresponding bulk methods for
extra speed.
## Hello queue
```C++
ConcurrentQueue<int> q;
for (int i = 0; i != 123; ++i)
q.enqueue(i);
int item;
for (int i = 0; i != 123; ++i) {
q.try_dequeue(item);
assert(item == i);
}
```
## Hello concurrency
Basic example of how to use the queue from multiple threads, with no
particular goal (i.e. it does nothing, but in an instructive way).
```C++
ConcurrentQueue<int> q;
int dequeued[100] = { 0 };
std::thread threads[20];
// Producers
for (int i = 0; i != 10; ++i) {
threads[i] = std::thread([&](int i) {
for (int j = 0; j != 10; ++j) {
q.enqueue(i * 10 + j);
}
}, i);
}
// Consumers
for (int i = 10; i != 20; ++i) {
threads[i] = std::thread([&]() {
int item;
for (int j = 0; j != 20; ++j) {
if (q.try_dequeue(item)) {
++dequeued[item];
}
}
});
}
// Wait for all threads
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
// Collect any leftovers (could be some if e.g. consumers finish before producers)
int item;
while (q.try_dequeue(item)) {
++dequeued[item];
}
// Make sure everything went in and came back out!
for (int i = 0; i != 100; ++i) {
assert(dequeued[i] == 1);
}
```
## Bulk up
Same as previous example, but runs faster.
```C++
ConcurrentQueue<int> q;
int dequeued[100] = { 0 };
std::thread threads[20];
// Producers
for (int i = 0; i != 10; ++i) {
threads[i] = std::thread([&](int i) {
int items[10];
for (int j = 0; j != 10; ++j) {
items[j] = i * 10 + j;
}
q.enqueue_bulk(items, 10);
}, i);
}
// Consumers
for (int i = 10; i != 20; ++i) {
threads[i] = std::thread([&]() {
int items[20];
for (std::size_t count = q.try_dequeue_bulk(items, 20); count != 0; --count) {
++dequeued[items[count - 1]];
}
});
}
// Wait for all threads
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
// Collect any leftovers (could be some if e.g. consumers finish before producers)
int items[10];
std::size_t count;
while ((count = q.try_dequeue_bulk(items, 10)) != 0) {
for (std::size_t i = 0; i != count; ++i) {
++dequeued[items[i]];
}
}
// Make sure everything went in and came back out!
for (int i = 0; i != 100; ++i) {
assert(dequeued[i] == 1);
}
```
## Producer/consumer model (simultaneous)
In this model, one set of threads is producing items,
and the other is consuming them concurrently until all of
them have been consumed. The counters are required to
ensure that all items eventually get consumed.
```C++
ConcurrentQueue<Item> q;
const int ProducerCount = 8;
const int ConsumerCount = 8;
std::thread producers[ProducerCount];
std::thread consumers[ConsumerCount];
std::atomic<int> doneProducers(0);
std::atomic<int> doneConsumers(0);
for (int i = 0; i != ProducerCount; ++i) {
producers[i] = std::thread([&]() {
while (produce) {
q.enqueue(produceItem());
}
doneProducers.fetch_add(1, std::memory_order_release);
});
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i] = std::thread([&]() {
Item item;
bool itemsLeft;
do {
// It's important to fence (if the producers have finished) *before* dequeueing
itemsLeft = doneProducers.load(std::memory_order_acquire) != ProducerCount;
while (q.try_dequeue(item)) {
itemsLeft = true;
consumeItem(item);
}
} while (itemsLeft || doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == ConsumerCount);
// The condition above is a bit tricky, but it's necessary to ensure that the
// last consumer sees the memory effects of all the other consumers before it
// calls try_dequeue for the last time
});
}
for (int i = 0; i != ProducerCount; ++i) {
producers[i].join();
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i].join();
}
```
## Producer/consumer model (simultaneous, blocking)
The blocking version is different, since either the number of elements being produced needs
to be known ahead of time, or some other coordination is required to tell the consumers when
to stop calling wait_dequeue (not shown here). This is necessary because otherwise a consumer
could end up blocking forever -- and destroying a queue while a consumer is blocking on it leads
to undefined behaviour.
```C++
BlockingConcurrentQueue<Item> q;
const int ProducerCount = 8;
const int ConsumerCount = 8;
std::thread producers[ProducerCount];
std::thread consumers[ConsumerCount];
std::atomic<int> promisedElementsRemaining(ProducerCount * 1000);
for (int i = 0; i != ProducerCount; ++i) {
producers[i] = std::thread([&]() {
for (int j = 0; j != 1000; ++j) {
q.enqueue(produceItem());
}
});
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i] = std::thread([&]() {
Item item;
while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed) > 0) {
q.wait_dequeue(item);
consumeItem(item);
}
});
}
for (int i = 0; i != ProducerCount; ++i) {
producers[i].join();
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i].join();
}
```
## Producer/consumer model (separate stages)
```C++
ConcurrentQueue<Item> q;
// Production stage
std::thread threads[8];
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
while (produce) {
q.enqueue(produceItem());
}
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
// Consumption stage
std::atomic<int> doneConsumers(0);
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
Item item;
do {
while (q.try_dequeue(item)) {
consumeItem(item);
}
// Loop again one last time if we're the last producer (with the acquired
// memory effects of the other producers):
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
```
Note that there's no point trying to use the blocking queue with this model, since
there's no need to use the `wait` methods (all the elements are produced before any
are consumed), and hence the complexity would be the same but with additional overhead.
## Object pool
If you don't know what threads will be using the queue in advance,
you can't really declare any long-term tokens. The obvious solution
is to use the implicit methods (that don't take any tokens):
```C++
// A pool of 'Something' objects that can be safely accessed
// from any thread
class SomethingPool
{
public:
Something getSomething()
{
Something obj;
queue.try_dequeue(obj);
// If the dequeue succeeded, obj will be an object from the
// thread pool, otherwise it will be the default-constructed
// object as declared above
return obj;
}
void recycleSomething(Something&& obj)
{
queue.enqueue(std::move(obj));
}
};
```
## Threadpool task queue
```C++
BlockingConcurrentQueue<Task> q;
// To create a task from any thread:
q.enqueue(...);
// On threadpool threads:
Task task;
while (true) {
q.wait_dequeue(task);
// Process task...
}
```
## Multithreaded game loop
```C++
BlockingConcurrentQueue<Task> q;
std::atomic<int> pendingTasks(0);
// On threadpool threads:
Task task;
while (true) {
q.wait_dequeue(task);
// Process task...
pendingTasks.fetch_add(-1, std::memory_order_release);
}
// Whenever a new task needs to be processed for the frame:
pendingTasks.fetch_add(1, std::memory_order_release);
q.enqueue(...);
// To wait for all the frame's tasks to complete before rendering:
while (pendingTasks.load(std::memory_order_acquire) != 0)
continue;
// Alternatively you could help out the thread pool while waiting:
while (pendingTasks.load(std::memory_order_acquire) != 0) {
if (!q.try_dequeue(task)) {
continue;
}
// Process task...
pendingTasks.fetch_add(-1, std::memory_order_release);
}
```
## Pump until empty
This might be useful if, for example, you want to process any remaining items
in the queue before it's destroyed. Note that it is your responsibility
to ensure that the memory effects of any enqueue operations you wish to see on
the dequeue thread are visible (i.e. if you're waiting for a certain set of elements,
you need to use memory fences to ensure that those elements are visible to the dequeue
thread after they've been enqueued).
```C++
ConcurrentQueue<Item> q;
// Single-threaded pumping:
Item item;
while (q.try_dequeue(item)) {
// Process item...
}
// q is guaranteed to be empty here, unless there is another thread enqueueing still or
// there was another thread dequeueing at one point and its memory effects have not
// yet been propagated to this thread.
// Multi-threaded pumping:
std::thread threads[8];
std::atomic<int> doneConsumers(0);
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
Item item;
do {
while (q.try_dequeue(item)) {
// Process item...
}
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
// If there are still enqueue operations happening on other threads,
// then the queue may not be empty at this point. However, if all enqueue
// operations completed before we finished pumping (and the propagation of
// their memory effects too), and all dequeue operations apart from those
// our threads did above completed before we finished pumping (and the
// propagation of their memory effects too), then the queue is guaranteed
// to be empty at this point.
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
```
## Wait for a queue to become empty (without dequeueing)
You can't (robustly) :-) However, you can set up your own atomic counter and
poll that instead (see the game loop example). If you're satisfied with merely an estimate, you can use
`size_approx()`. Note that `size_approx()` may return 0 even if the queue is
not completely empty, unless the queue has already stabilized first (no threads
are enqueueing or dequeueing, and all memory effects of any previous operations
have been propagated to the thread before it calls `size_approx()`).