You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/tests/unittests/unittests.cpp

5197 lines
136 KiB
C++

// ©2013-2014 Cameron Desrochers.
// Distributed under the simplified BSD license (see the LICENSE file that
// should have come with this file).
// Unit tests for moodycamel::ConcurrentQueue
#define likely MAKE_SURE_LIKELY_MACRO_CAN_PEACEFULLY_COEXIST
#define unlikely MAKE_SURE_UNLIKELY_MACRO_CAN_PEACEFULLY_COEXIST
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <cstddef>
#include <string>
#include <iterator>
struct MakeSureCustomNewCanPeacefullyCoexist;
void* operator new(size_t size, MakeSureCustomNewCanPeacefullyCoexist* x);
void operator delete(void* ptr, MakeSureCustomNewCanPeacefullyCoexist* x);
#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h> // Not because we need it, but to ensure no conflicts arise with the queue's declarations
#endif
#include "minitest.h"
#include "../common/simplethread.h"
#include "../common/systemtime.h"
#include "../../concurrentqueue.h"
#include "../../blockingconcurrentqueue.h"
#include "../../c_api/concurrentqueue.h"
namespace {
struct tracking_allocator
{
union tag {
std::size_t size;
#ifdef __GNUC__
max_align_t dummy; // GCC forgot to add it to std:: for a while
#else
std::max_align_t dummy; // Others (e.g. MSVC) insist it can *only* be accessed via std::
#endif
};
static inline void* malloc(std::size_t size)
{
auto ptr = std::malloc(size + sizeof(tag));
if (ptr) {
reinterpret_cast<tag*>(ptr)->size = size;
usage.fetch_add(size, std::memory_order_relaxed);
return reinterpret_cast<char*>(ptr) + sizeof(tag);
}
return nullptr;
}
static inline void free(void* ptr)
{
if (ptr) {
ptr = reinterpret_cast<char*>(ptr) - sizeof(tag);
auto size = reinterpret_cast<tag*>(ptr)->size;
usage.fetch_add(-size, std::memory_order_relaxed);
}
std::free(ptr);
}
static inline std::size_t current_usage() { return usage.load(std::memory_order_relaxed); }
private:
static std::atomic<std::size_t> usage;
};
std::atomic<std::size_t> tracking_allocator::usage(0);
}
struct corealgos_allocator
{
static inline void* malloc(std::size_t size) { return tracking_allocator::malloc(size); }
static inline void free(void* ptr) { tracking_allocator::free(ptr); }
};
#define corealgos_allocator corealgos_allocator
#include "../corealgos.h"
using namespace moodycamel;
namespace moodycamel
{
struct MallocTrackingTraits : public ConcurrentQueueDefaultTraits
{
static inline void* malloc(std::size_t size) { return tracking_allocator::malloc(size); }
static inline void free(void* ptr) { tracking_allocator::free(ptr); }
};
template<std::size_t BlockSize = ConcurrentQueueDefaultTraits::BLOCK_SIZE, std::size_t InitialIndexSize = ConcurrentQueueDefaultTraits::EXPLICIT_INITIAL_INDEX_SIZE, bool RecycleBlocks = ConcurrentQueueDefaultTraits::RECYCLE_ALLOCATED_BLOCKS>
struct TestTraits : public MallocTrackingTraits
{
typedef std::size_t size_t;
typedef uint64_t index_t;
static const size_t BLOCK_SIZE = BlockSize;
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize * 2;
static const bool RECYCLE_ALLOCATED_BLOCKS = RecycleBlocks;
static inline void reset() { _malloc_count() = 0; _free_count() = 0; }
static inline std::atomic<int>& _malloc_count() { static std::atomic<int> c; return c; }
static inline int malloc_count() { return _malloc_count().load(std::memory_order_seq_cst); }
static inline std::atomic<int>& _free_count() { static std::atomic<int> c; return c; }
static inline int free_count() { return _free_count().load(std::memory_order_seq_cst); }
static inline void* malloc(ConcurrentQueueDefaultTraits::size_t bytes) { ++_malloc_count(); return tracking_allocator::malloc(bytes); }
static inline void free(void* obj) { ++_free_count(); return tracking_allocator::free(obj); }
};
struct SmallIndexTraits : public MallocTrackingTraits
{
typedef uint16_t size_t;
typedef uint16_t index_t;
};
struct ExtraSmallIndexTraits : public MallocTrackingTraits
{
typedef uint8_t size_t;
typedef uint8_t index_t;
};
struct LargeTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 128;
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 128;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128;
};
// Note: Not thread safe!
struct Foo
{
static int& nextId() { static int i; return i; }
static int& createCount() { static int c; return c; }
static int& destroyCount() { static int c; return c; }
static bool& destroyedInOrder() { static bool d = true; return d; }
static void reset() { createCount() = 0; destroyCount() = 0; nextId() = 0; destroyedInOrder() = true; lastDestroyedId() = -1; }
Foo() { id = nextId()++; ++createCount(); }
Foo(Foo const&) MOODYCAMEL_DELETE_FUNCTION;
Foo(Foo&& other) { id = other.id; other.id = -1; }
void operator=(Foo&& other) { id = other.id; other.id = -1; }
~Foo()
{
++destroyCount();
if (id == -2) {
// Double free!
destroyedInOrder() = false;
}
else if (id != -1) {
if (id <= lastDestroyedId()) {
destroyedInOrder() = false;
}
lastDestroyedId() = id;
}
id = -2;
}
private:
int id;
static int& lastDestroyedId() { static int i = -1; return i; }
};
struct Copyable {
Copyable(int id) : copied(false), id(id) { }
Copyable(Copyable const& o) : copied(true), id(o.id) { }
void operator=(Copyable const& o) { copied = true; id = o.id; }
bool copied;
int id;
};
struct Moveable {
Moveable(int id) : moved(false), copied(false), id(id) { }
Moveable(Moveable&& o) MOODYCAMEL_NOEXCEPT : moved(true), copied(o.copied), id(o.id) { }
void operator=(Moveable&& o) MOODYCAMEL_NOEXCEPT { moved = true; copied = o.copied; id = o.id; }
bool moved;
bool copied;
int id;
#if defined(_MSC_VER) && _MSC_VER < 1800
// VS2012's std::is_nothrow_[move_]constructible is broken, so the queue never attempts to
// move objects with that compiler. In this case, we don't know whether it's really a copy
// or not being done, so give the benefit of the doubt (given the tests pass on other platforms)
// and assume it would have done a move if it could have (don't set copied to true).
Moveable(Moveable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(o.copied), id(o.id) { }
void operator=(Moveable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = o.copied; id = o.id; }
#else
Moveable(Moveable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(true), id(o.id) { }
void operator=(Moveable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = true; id = o.id; }
#endif
};
struct ThrowingMovable {
static std::atomic<int>& ctorCount() { static std::atomic<int> c; return c; }
static std::atomic<int>& destroyCount() { static std::atomic<int> c; return c; }
static void reset() { ctorCount() = 0; destroyCount() = 0; }
explicit ThrowingMovable(int id, bool throwOnCctor = false, bool throwOnAssignment = false, bool throwOnSecondCctor = false)
: id(id), moved(false), copied(false), throwOnCctor(throwOnCctor), throwOnAssignment(throwOnAssignment), throwOnSecondCctor(throwOnSecondCctor)
{
ctorCount().fetch_add(1, std::memory_order_relaxed);
}
ThrowingMovable(ThrowingMovable const& o)
: id(o.id), moved(false), copied(true), throwOnCctor(o.throwOnCctor), throwOnAssignment(o.throwOnAssignment), throwOnSecondCctor(false)
{
if (throwOnCctor) {
throw this;
}
ctorCount().fetch_add(1, std::memory_order_relaxed);
throwOnCctor = o.throwOnSecondCctor;
}
ThrowingMovable(ThrowingMovable&& o)
: id(o.id), moved(true), copied(false), throwOnCctor(o.throwOnCctor), throwOnAssignment(o.throwOnAssignment), throwOnSecondCctor(false)
{
if (throwOnCctor) {
throw this;
}
ctorCount().fetch_add(1, std::memory_order_relaxed);
throwOnCctor = o.throwOnSecondCctor;
}
~ThrowingMovable()
{
destroyCount().fetch_add(1, std::memory_order_relaxed);
}
void operator=(ThrowingMovable const& o)
{
id = o.id;
moved = false;
copied = true;
throwOnCctor = o.throwOnCctor;
throwOnAssignment = o.throwOnAssignment;
throwOnSecondCctor = o.throwOnSecondCctor;
if (throwOnAssignment) {
throw this;
}
}
void operator=(ThrowingMovable&& o)
{
id = o.id;
moved = true;
copied = false;
throwOnCctor = o.throwOnCctor;
throwOnAssignment = o.throwOnAssignment;
throwOnSecondCctor = o.throwOnSecondCctor;
if (throwOnAssignment) {
throw this;
}
}
int id;
bool moved;
bool copied;
public:
bool throwOnCctor;
bool throwOnAssignment;
bool throwOnSecondCctor;
};
#ifdef __arm__
#define SUPER_ALIGNMENT 64
#else
#define SUPER_ALIGNMENT 128
#endif
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4324) // structure was padded due to alignment specifier
#endif
struct MOODYCAMEL_ALIGNAS(SUPER_ALIGNMENT) VeryAligned {
static size_t errors;
int value;
VeryAligned() MOODYCAMEL_NOEXCEPT : value(0) {
if (reinterpret_cast<uintptr_t>(this) % SUPER_ALIGNMENT != 0)
++errors;
}
VeryAligned(int value) MOODYCAMEL_NOEXCEPT : value(value) {
if (reinterpret_cast<uintptr_t>(this) % SUPER_ALIGNMENT != 0)
++errors;
}
VeryAligned(VeryAligned&& x) MOODYCAMEL_NOEXCEPT : value(x.value) {
if (reinterpret_cast<uintptr_t>(this) % SUPER_ALIGNMENT != 0)
++errors;
x.value = 0;
}
VeryAligned& operator=(VeryAligned&& x) MOODYCAMEL_NOEXCEPT {
std::swap(value, x.value);
return *this;
}
VeryAligned(VeryAligned const&) MOODYCAMEL_DELETE_FUNCTION;
VeryAligned& operator=(VeryAligned const&) MOODYCAMEL_DELETE_FUNCTION;
};
size_t VeryAligned::errors = 0;
#ifdef _MSC_VER
#pragma warning(pop)
#endif
class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
{
public:
ConcurrentQueueTests()
{
REGISTER_TEST(create_empty_queue);
REGISTER_TEST(create_token);
REGISTER_TEST(circular_less_than);
REGISTER_TEST(enqueue_one_explicit);
REGISTER_TEST(enqueue_and_dequeue_one_explicit);
REGISTER_TEST(enqueue_one_implicit);
REGISTER_TEST(enqueue_and_dequeue_one_implicit);
REGISTER_TEST(enqueue_and_dequeue_a_few);
REGISTER_TEST(enqueue_bulk);
REGISTER_TEST(block_alloc);
REGISTER_TEST(token_move);
REGISTER_TEST(multi_producers);
REGISTER_TEST(producer_reuse);
REGISTER_TEST(block_reuse);
REGISTER_TEST(block_recycling);
REGISTER_TEST(leftovers_destroyed);
REGISTER_TEST(block_index_resized);
REGISTER_TEST(try_dequeue);
REGISTER_TEST(try_dequeue_threaded);
REGISTER_TEST(try_dequeue_bulk);
REGISTER_TEST(try_dequeue_bulk_threaded);
REGISTER_TEST(implicit_producer_hash);
REGISTER_TEST(index_wrapping);
REGISTER_TEST(subqueue_size_limit);
REGISTER_TEST(exceptions);
REGISTER_TEST(implicit_producer_churn);
REGISTER_TEST(test_threaded);
REGISTER_TEST(test_threaded_bulk);
REGISTER_TEST(full_api<ConcurrentQueueDefaultTraits>);
REGISTER_TEST(full_api<SmallIndexTraits>);
REGISTER_TEST(blocking_wrappers);
REGISTER_TEST(timed_blocking_wrappers);
//c_api/concurrentqueue
REGISTER_TEST(c_api_create);
REGISTER_TEST(c_api_enqueue);
REGISTER_TEST(c_api_try_dequeue);
REGISTER_TEST(c_api_destroy);
// Semaphore
REGISTER_TEST(acquire_and_signal);
REGISTER_TEST(try_acquire_and_signal);
// Core algos
REGISTER_TEST(core_add_only_list);
REGISTER_TEST(core_thread_local);
REGISTER_TEST(core_free_list);
REGISTER_TEST(core_spmc_hash);
REGISTER_TEST(explicit_strings_threaded);
REGISTER_TEST(large_traits);
}
bool postTest(bool testSucceeded) override
{
if (testSucceeded) {
// If this assertion fails, there's necessarily a memory leak somewhere!
ASSERT_OR_FAIL(tracking_allocator::current_usage() == 0);
}
return true;
}
bool create_empty_queue()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
return true;
}
bool create_token()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
ProducerToken tok(q);
return true;
}
bool circular_less_than()
{
{
uint32_t a, b;
a = 0; b = 100;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 100; b = 0;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 0; b = 0;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 100; b = 100;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0; b = 1u << 31;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 1; b = 1u << 31;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0; b = (1u << 31) + 1;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 100; b = (1u << 31) + 1;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = (1u << 31) + 7; b = 5;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = (1u << 16) + 7; b = (1 << 16) + 5;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 0xFFFFFFFFu; b = 0;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0xFFFFFFFFu; b = 0xFFFFFFu;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
}
{
uint16_t a, b;
a = 0; b = 100;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 100; b = 0;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 0; b = 0;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 100; b = 100;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0; b = 1 << 15;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 1; b = 1 << 15;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0; b = (1 << 15) + 1;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 100; b = (1 << 15) + 1;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = (1 << 15) + 7; b = 5;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = (1 << 15) + 7; b = (1 << 15) + 5;
ASSERT_OR_FAIL(!details::circular_less_than(a, b));
ASSERT_OR_FAIL(details::circular_less_than(b, a));
a = 0xFFFF; b = 0;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
a = 0xFFFF; b = 0xFFF;
ASSERT_OR_FAIL(details::circular_less_than(a, b));
ASSERT_OR_FAIL(!details::circular_less_than(b, a));
}
return true;
}
bool enqueue_one_explicit()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
ProducerToken tok(q);
bool result = q.enqueue(tok, 17);
ASSERT_OR_FAIL(result);
return true;
}
bool enqueue_and_dequeue_one_explicit()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
ProducerToken tok(q);
int item = 0;
ASSERT_OR_FAIL(q.enqueue(tok, 123));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == 123);
return true;
}
bool enqueue_one_implicit()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
bool result = q.enqueue(17);
ASSERT_OR_FAIL(result);
return true;
}
bool enqueue_and_dequeue_one_implicit()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
int item = 0;
ASSERT_OR_FAIL(q.enqueue(123));
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 123);
return true;
}
bool enqueue_and_dequeue_a_few()
{
// Fairly straightforward mass enqueue and dequeue
{
ConcurrentQueue<int, TestTraits<16>> q;
ProducerToken tok(q);
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.enqueue(tok, i));
}
int item;
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(tok, item));
}
// Interleaved enqueue and dequeue (though still no threads involved)
{
ConcurrentQueue<int, TestTraits<16>> q;
ProducerToken tok(q);
int item;
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.enqueue(tok, i));
ASSERT_OR_FAIL(q.enqueue(tok, i * 2));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == (i / 2) * (i % 2 == 0 ? 1 : 2));
}
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == ((i + 99999) / 2) * (i % 2 == 1 ? 1 : 2));
}
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(tok, item));
}
// Implicit usage
{
ConcurrentQueue<int, TestTraits<16>> q;
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
int item;
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<int, TestTraits<16>> q;
int item;
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
ASSERT_OR_FAIL(q.enqueue(i * 2));
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == (i / 2) * (i % 2 == 0 ? 1 : 2));
}
for (int i = 0; i != 99999; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == ((i + 99999) / 2) * (i % 2 == 1 ? 1 : 2));
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
return true;
}
bool enqueue_bulk()
{
typedef TestTraits<2> Traits2;
typedef TestTraits<4> Traits4;
int arr123[] = { 1, 2, 3 };
int arr1234[] = { 1, 2, 3, 4 };
int arr123456[] = { 1, 2, 3, 4, 5, 6 };
Traits2::reset();
{
// Implicit, block allocation required
ConcurrentQueue<int, Traits2> q(2);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
q.enqueue_bulk(arr123, 3);
ASSERT_OR_FAIL(Traits2::malloc_count() == 4); // One for producer, one for block index, one for block
int item;
for (int i = 0; i != 3; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Implicit, block allocation not required (end on block boundary)
ConcurrentQueue<int, Traits4> q(2);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
q.enqueue_bulk(arr1234, 4);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
int item;
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Implicit, allocation fail
ConcurrentQueue<int, Traits2> q(2);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ASSERT_OR_FAIL(!q.try_enqueue_bulk(arr123, 3));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // Still has to allocate implicit producer and block index
int item;
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(q.try_enqueue_bulk(arr123, 2));
for (int i = 0; i != 2; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Implicit, block allocation not required
ConcurrentQueue<int, Traits2> q(4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
q.enqueue_bulk(arr1234, 4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
int item;
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Implicit, block allocation required (end not on block boundary)
ConcurrentQueue<int, Traits4> q(4);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ASSERT_OR_FAIL(q.enqueue(0));
ASSERT_OR_FAIL(q.enqueue_bulk(arr1234, 4));
ASSERT_OR_FAIL(Traits4::malloc_count() == 4); // One for producer, one for block index, one for block
int item;
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Implicit, block allocation not required (end not on block boundary)
ConcurrentQueue<int, Traits4> q(5);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ASSERT_OR_FAIL(q.enqueue(0));
ASSERT_OR_FAIL(q.enqueue_bulk(arr1234, 4));
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
int item;
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Implicit, block allocation fail (end not on block boundary) -- test rewind
ConcurrentQueue<int, Traits2> q(4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ASSERT_OR_FAIL(q.enqueue(17));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(!q.try_enqueue_bulk(arr123456, 6));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
int item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 17);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Implicit, enqueue nothing
ConcurrentQueue<int, Traits2> q(3);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ASSERT_OR_FAIL(q.try_enqueue_bulk(arr123, 0));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
int item;
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
////////
Traits2::reset();
{
// Explicit, block allocation required
ConcurrentQueue<int, Traits2> q(2);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
q.enqueue_bulk(tok, arr123, 3);
ASSERT_OR_FAIL(Traits2::malloc_count() == 4); // One for block
int item;
for (int i = 0; i != 3; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Explicit, block allocation not required (end on block boundary)
ConcurrentQueue<int, Traits4> q(2);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
q.enqueue_bulk(tok, arr1234, 4);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3);
int item;
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Explicit, allocation fail
ConcurrentQueue<int, Traits2> q(2);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(!q.try_enqueue_bulk(tok, arr123, 3));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
int item;
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(q.try_enqueue_bulk(tok, arr123, 2));
for (int i = 0; i != 2; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
}
Traits2::reset();
{
// Explicit, block allocation not required
ConcurrentQueue<int, Traits2> q(4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
q.enqueue_bulk(tok, arr1234, 4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
int item;
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Explicit, block allocation required (end not on block boundary)
ConcurrentQueue<int, Traits4> q(4);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(q.enqueue(tok, 0));
ASSERT_OR_FAIL(q.enqueue_bulk(tok, arr1234, 4));
ASSERT_OR_FAIL(Traits4::malloc_count() == 4); // One for block
int item;
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Explicit, block allocation not required (end not on block boundary)
ConcurrentQueue<int, Traits4> q(5);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(q.enqueue(tok, 0));
ASSERT_OR_FAIL(q.enqueue_bulk(tok, arr1234, 4));
ASSERT_OR_FAIL(Traits4::malloc_count() == 3);
int item;
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Explicit, block allocation fail (end not on block boundary) -- test rewind
ConcurrentQueue<int, Traits2> q(4);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(q.enqueue(tok, 17));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
ASSERT_OR_FAIL(!q.try_enqueue_bulk(tok, arr123456, 6));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
int item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 17);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits2::reset();
{
// Explicit, enqueue nothing
ConcurrentQueue<int, Traits2> q(3);
ASSERT_OR_FAIL(Traits2::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits2::malloc_count() == 3); // One for producer, one for block index
ASSERT_OR_FAIL(q.try_enqueue_bulk(tok, arr123, 0));
ASSERT_OR_FAIL(Traits2::malloc_count() == 3);
int item;
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(q.enqueue(tok, 17));
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 17);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
Traits4::reset();
{
// Explicit, re-use empty blocks
ConcurrentQueue<int, Traits4> q(8);
ASSERT_OR_FAIL(Traits4::malloc_count() == 1);
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits4::malloc_count() == 3); // One for producer, one for block index
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.enqueue(tok, i));
}
int item;
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(Traits4::malloc_count() == 3);
ASSERT_OR_FAIL(q.enqueue_bulk(tok, arr123456, 6));
ASSERT_OR_FAIL(Traits4::malloc_count() == 3);
for (int i = 0; i != 6; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(Traits4::malloc_count() == 3);
}
return true;
}
bool block_alloc()
{
typedef TestTraits<2> Traits;
typedef TestTraits<2, 32, true> RecycleTraits;
Traits::reset();
// Explicit
{
ConcurrentQueue<int, Traits> q(7);
ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
ASSERT_OR_FAIL(Traits::malloc_count() == 1);
ASSERT_OR_FAIL(Traits::free_count() == 0);
{
ProducerToken tok(q);
ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Enqueue one item too many (force extra block allocation)
for (int i = 0; i != 9; ++i) {
ASSERT_OR_FAIL(q.enqueue(tok, i));
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Still room for one more...
ASSERT_OR_FAIL(q.enqueue(tok, 9));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// No more room without further allocations
ASSERT_OR_FAIL(!q.try_enqueue(tok, 10));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Check items were enqueued properly
int item;
for (int i = 0; i != 10; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == i);
}
// Queue should be empty, but not freed
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(Traits::free_count() == 0);
}
// Explicit producers are recycled, so block should still be allocated
ASSERT_OR_FAIL(Traits::free_count() == 0);
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 4);
// Implicit
Traits::reset();
{
ConcurrentQueue<int, Traits> q(7);
ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
ASSERT_OR_FAIL(q.enqueue(39));
ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Enqueue one item too many (force extra block allocation)
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Still room for one more...
ASSERT_OR_FAIL(q.enqueue(8));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// No more room without further allocations
ASSERT_OR_FAIL(!q.try_enqueue(9));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Check items were enqueued properly
int item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 39);
for (int i = 0; i != 9; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
// Queue should be empty, and extra block freed
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(Traits::free_count() == 1);
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 4);
// Implicit
RecycleTraits::reset();
{
ConcurrentQueue<int, RecycleTraits> q(7);
ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
ASSERT_OR_FAIL(q.enqueue(39));
ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 3); // one for producer, one for its block index
ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Enqueue one item too many (force extra block allocation)
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Still room for one more...
ASSERT_OR_FAIL(q.enqueue(8));
ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// No more room without further allocations
ASSERT_OR_FAIL(!q.try_enqueue(9));
ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Check items were enqueued properly
int item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 39);
for (int i = 0; i != 9; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
// Queue should be empty, but extra block not freed
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
}
ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
ASSERT_OR_FAIL(RecycleTraits::free_count() == 4);
// Super-aligned
Traits::reset();
VeryAligned::errors = 0;
{
ConcurrentQueue<VeryAligned, Traits> q(7);
ASSERT_OR_FAIL(q.enqueue(39));
ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
ASSERT_OR_FAIL(Traits::free_count() == 0);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
// Enqueue one item too many (force extra block allocation)
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
ASSERT_OR_FAIL(VeryAligned::errors == 0);
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Still room for one more...
ASSERT_OR_FAIL(q.enqueue(8));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
// No more room without further allocations
ASSERT_OR_FAIL(!q.try_enqueue(9));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
// Check items were enqueued properly
VeryAligned item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.value == 39);
for (int i = 0; i != 9; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.value == i);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
}
// Queue should be empty, and extra block freed
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(Traits::free_count() == 1);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 4);
return true;
}
bool token_move()
{
typedef TestTraits<16> Traits;
Traits::reset();
{
ConcurrentQueue<int, Traits> q;
ProducerToken t0(q);
ASSERT_OR_FAIL(t0.valid());
ProducerToken t1(std::move(t0));
ASSERT_OR_FAIL(t1.valid());
ASSERT_OR_FAIL(!t0.valid());
t1 = std::move(t1);
ASSERT_OR_FAIL(t1.valid());
ASSERT_OR_FAIL(!t0.valid());
ProducerToken t2(q);
t2 = std::move(t1);
ASSERT_OR_FAIL(t2.valid());
ASSERT_OR_FAIL(t1.valid());
ASSERT_OR_FAIL(!t0.valid());
t0 = std::move(t1);
ASSERT_OR_FAIL(t2.valid());
ASSERT_OR_FAIL(!t1.valid());
ASSERT_OR_FAIL(t0.valid());
}
ASSERT_OR_FAIL(Traits::malloc_count() == 5); // 2 for each producer + 1 for initial block pool
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
return true;
}
bool multi_producers()
{
typedef TestTraits<16> Traits;
Traits::reset();
{
ConcurrentQueue<int, Traits> q;
ProducerToken t0(q);
ProducerToken t1(q);
ProducerToken t2(q);
ProducerToken t3(q);
ProducerToken t4(q);
ASSERT_OR_FAIL(q.enqueue(t0, 0));
ASSERT_OR_FAIL(q.enqueue(t1, 1));
ASSERT_OR_FAIL(q.enqueue(t2, 2));
ASSERT_OR_FAIL(q.enqueue(t3, 3));
ASSERT_OR_FAIL(q.enqueue(t4, 4));
int item;
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t0, item) && item == 0 && !q.try_dequeue_from_producer(t0, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t1, item) && item == 1 && !q.try_dequeue_from_producer(t1, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t2, item) && item == 2 && !q.try_dequeue_from_producer(t2, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t3, item) && item == 3 && !q.try_dequeue_from_producer(t3, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t4, item) && item == 4 && !q.try_dequeue_from_producer(t4, item));
}
ASSERT_OR_FAIL(Traits::malloc_count() == 11); // 2 for each producer + 1 for initial block pool
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
// Implicit
Traits::reset();
{
ConcurrentQueue<int, Traits> q;
std::atomic<bool> success[5];
std::atomic<int> done(0);
for (int i = 0; i != 5; ++i) {
success[i].store(false, std::memory_order_relaxed);
}
for (int i = 0; i != 5; ++i) {
SimpleThread t([&](int j) {
success[j].store(q.enqueue(j), std::memory_order_relaxed);
done.fetch_add(1, std::memory_order_release);
}, i);
t.join();
}
while (done.load(std::memory_order_acquire) != 5) {
continue;
}
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(success[i].load(std::memory_order_relaxed));
}
// Cannot rely on order that producers are added (there's a race condition), only that they are all there somewhere.
// Also, all items may not be visible to this thread yet.
bool itemDequeued[5] = { false, false, false, false, false };
int item;
for (int i = 0; i != 5;) {
if (q.try_dequeue(item)) {
itemDequeued[item] = true;
++i;
}
}
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(itemDequeued[i]);
}
}
ASSERT_OR_FAIL(Traits::malloc_count() <= 11 && Traits::malloc_count() >= 3); // 2 for each producer (depending on thread ID re-use) + 1 for initial block pool
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
return true;
}
bool producer_reuse()
{
typedef TestTraits<16> Traits;
Traits::reset();
{
// Explicit
ConcurrentQueue<int, Traits> q;
{
ProducerToken t0(q);
}
{
ProducerToken t1(q);
}
{
ProducerToken t2(q);
ProducerToken t3(q);
ProducerToken t4(q);
ProducerToken t5(q);
}
{
ProducerToken t6(q);
ProducerToken t7(q);
}
{
ProducerToken t8(q);
ProducerToken t9(q);
}
{
ProducerToken t10(q);
ProducerToken t11(q);
}
}
ASSERT_OR_FAIL(Traits::malloc_count() == 9); // 2 for max number of live producers + 1 for initial block pool
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
Traits::reset();
{
// Implicit
const int MAX_THREADS = 48;
ConcurrentQueue<int, Traits> q(Traits::BLOCK_SIZE * (MAX_THREADS + 1));
ASSERT_OR_FAIL(Traits::malloc_count() == 1); // Initial block pool
SimpleThread t0([&]() { q.enqueue(0); });
t0.join();
ASSERT_OR_FAIL(Traits::malloc_count() == 3); // Implicit producer
SimpleThread t1([&]() { q.enqueue(1); });
t1.join();
ASSERT_OR_FAIL(Traits::malloc_count() == 3);
SimpleThread t2([&]() { q.enqueue(2); });
t2.join();
ASSERT_OR_FAIL(Traits::malloc_count() == 3);
q.enqueue(3);
ASSERT_OR_FAIL(Traits::malloc_count() == 3);
int item;
int i = 0;
while (q.try_dequeue(item)) {
ASSERT_OR_FAIL(item == i);
++i;
}
ASSERT_OR_FAIL(i == 4);
ASSERT_OR_FAIL(Traits::malloc_count() == 3);
std::vector<SimpleThread> threads(MAX_THREADS);
for (int rep = 0; rep != 2; ++rep) {
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](std::size_t tid) {
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
q.enqueue((int)tid);
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
}, tid);
}
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
}
std::vector<bool> seenIds(threads.size());
for (std::size_t i = 0; i != threads.size(); ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(!seenIds[item]);
seenIds[item] = true;
}
for (std::size_t i = 0; i != seenIds.size(); ++i) {
ASSERT_OR_FAIL(seenIds[i]);
}
ASSERT_OR_FAIL(Traits::malloc_count() <= 2 * MAX_THREADS + 1);
}
}
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
Traits::reset();
{
// Test many threads and implicit queues being created and destroyed concurrently
std::vector<SimpleThread> threads(32);
std::vector<bool> success(threads.size(), true);
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](std::size_t tid) {
for (int i = 0; i != 5; ++i) {
ConcurrentQueue<int, MallocTrackingTraits> q(1);
q.enqueue(i);
}
ConcurrentQueue<int, MallocTrackingTraits> q(15);
for (int i = 0; i != 100; ++i) {
q.enqueue(i);
}
int item;
for (int i = 0; i != 100; ++i) {
if (!q.try_dequeue(item) || item != i) {
success[tid] = false;
}
}
if (q.size_approx() != 0) {
success[tid] = false;
}
}, tid);
}
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
ASSERT_OR_FAIL(success[tid]);
}
}
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
#endif
return true;
}
bool block_reuse()
{
int item;
typedef TestTraits<4> SmallBlocks;
SmallBlocks::reset();
{
ConcurrentQueue<int, SmallBlocks> q(8); // 2 blocks
ProducerToken t(q);
for (int j = 0; j != 3; ++j) {
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == ((i + 4) & 7));
}
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(t, item));
}
}
ASSERT_OR_FAIL(SmallBlocks::malloc_count() == 3);
ASSERT_OR_FAIL(SmallBlocks::free_count() == SmallBlocks::malloc_count());
typedef TestTraits<8192> HugeBlocks;
HugeBlocks::reset();
{
ConcurrentQueue<int, HugeBlocks> q(8192 * 2); // 2 blocks
ProducerToken t(q);
for (int j = 0; j != 3; ++j) {
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8192 * 2; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.enqueue(t, i));
}
for (int i = 0; i != 8192 * 2; ++i) {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item == ((i + 8192) & (8192 * 2 - 1)));
}
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(t, item));
}
}
ASSERT_OR_FAIL(HugeBlocks::malloc_count() == 3);
ASSERT_OR_FAIL(HugeBlocks::free_count() == HugeBlocks::malloc_count());
// Implicit
SmallBlocks::reset();
{
ConcurrentQueue<int, SmallBlocks> q(8); // 2 blocks
for (int j = 0; j != 3; ++j) {
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == ((i + 4) & 7));
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
}
ASSERT_OR_FAIL(SmallBlocks::malloc_count() == 3);
ASSERT_OR_FAIL(SmallBlocks::free_count() == SmallBlocks::malloc_count());
HugeBlocks::reset();
{
ConcurrentQueue<int, HugeBlocks> q(8192 * 2); // 2 blocks
for (int j = 0; j != 3; ++j) {
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8192 * 2; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
for (int i = 0; i != 8192; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
for (int i = 0; i != 8192 * 2; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == ((i + 8192) & (8192 * 2 - 1)));
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
}
ASSERT_OR_FAIL(HugeBlocks::malloc_count() == 3);
ASSERT_OR_FAIL(HugeBlocks::free_count() == HugeBlocks::malloc_count());
return true;
}
bool block_recycling()
{
typedef TestTraits<4> SmallBlocks;
SmallBlocks::reset();
ConcurrentQueue<int, SmallBlocks> q(24); // 6 blocks
SimpleThread threads[4];
std::atomic<bool> success(true);
for (int i = 0; i != 4; ++i) {
threads[i] = SimpleThread([&](int i) {
int item;
int next = 0;
int prevItems[4] = { -1, -1, -1, -1 };
for (int successfulEnqueues = 0; successfulEnqueues < 10000;) {
for (int j = 0; j != 12; ++j) {
if (q.try_enqueue((i << 28) | next++)) {
++successfulEnqueues;
}
}
for (int j = 0; j != 12; ++j) {
if (q.try_dequeue(item)) {
if ((item & 0x0FFFFFFF) <= prevItems[item >> 28]) {
success.store(false, std::memory_order_relaxed);
}
prevItems[item >> 28] = item & 0x0FFFFFFF;
}
}
}
}, i);
}
for (int i = 0; i != 4; ++i) {
threads[i].join();
}
int item;
int prevItems[4] = { -1, -1, -1, -1 };
while (q.try_dequeue(item)) {
ASSERT_OR_FAIL((item & 0x0FFFFFFF) > prevItems[item >> 28]);
prevItems[item >> 28] = item & 0x0FFFFFFF;
}
ASSERT_OR_FAIL(success.load(std::memory_order_relaxed));
return true;
}
bool leftovers_destroyed()
{
typedef TestTraits<4> Traits;
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(4); // One block
ProducerToken t(q);
Foo item;
q.enqueue(t, Foo());
q.enqueue(t, Foo());
q.enqueue(t, Foo());
q.try_dequeue_from_producer(t, item);
}
ASSERT_OR_FAIL(Foo::createCount() == 4);
ASSERT_OR_FAIL(Foo::destroyCount() == 7);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(4); // One block
ProducerToken t(q);
q.enqueue(t, Foo());
q.enqueue(t, Foo());
q.enqueue(t, Foo());
q.enqueue(t, Foo());
}
ASSERT_OR_FAIL(Foo::createCount() == 4);
ASSERT_OR_FAIL(Foo::destroyCount() == 8);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(8); // Two blocks
ProducerToken t(q);
for (int i = 0; i != 8; ++i) {
q.enqueue(t, Foo());
}
}
ASSERT_OR_FAIL(Foo::createCount() == 8);
ASSERT_OR_FAIL(Foo::destroyCount() == 16);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(12); // Three blocks
ProducerToken t(q);
// Last block only partially full
for (int i = 0; i != 10; ++i) {
q.enqueue(t, Foo());
}
// First block only partially full
Foo item;
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
}
ASSERT_OR_FAIL(Foo::createCount() == 11);
ASSERT_OR_FAIL(Foo::destroyCount() == 21);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
// Implicit
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(4); // One block
Foo item;
q.enqueue(Foo());
q.enqueue(Foo());
q.enqueue(Foo());
q.try_dequeue(item);
}
ASSERT_OR_FAIL(Foo::createCount() == 4);
ASSERT_OR_FAIL(Foo::destroyCount() == 7);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(4); // One block
q.enqueue(Foo());
q.enqueue(Foo());
q.enqueue(Foo());
q.enqueue(Foo());
}
ASSERT_OR_FAIL(Foo::createCount() == 4);
ASSERT_OR_FAIL(Foo::destroyCount() == 8);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(8); // Two blocks
for (int i = 0; i != 8; ++i) {
q.enqueue(Foo());
}
}
ASSERT_OR_FAIL(Foo::createCount() == 8);
ASSERT_OR_FAIL(Foo::destroyCount() == 16);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(12); // Three blocks
// Last block only partially full
for (int i = 0; i != 10; ++i) {
q.enqueue(Foo());
}
// First block only partially full
Foo item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(q.try_dequeue(item));
}
ASSERT_OR_FAIL(Foo::createCount() == 11);
ASSERT_OR_FAIL(Foo::destroyCount() == 21);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
return true;
}
bool block_index_resized()
{
typedef TestTraits<4, 2> Traits;
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(8); // 2 blocks, matches initial index size
ProducerToken t(q);
for (int i = 0; i != 1024; ++i) {
q.enqueue(t, Foo());
}
for (int i = 0; i != 1024; ++i) {
Foo item;
q.try_dequeue_from_producer(t, item);
}
}
ASSERT_OR_FAIL(Traits::malloc_count() == 1 + 2 + 254 + 7);
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
ASSERT_OR_FAIL(Foo::createCount() == 2048);
ASSERT_OR_FAIL(Foo::destroyCount() == 3072);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
// Implicit
Traits::reset();
Foo::reset();
{
ConcurrentQueue<Foo, Traits> q(8); // 2 blocks
for (int i = 0; i != 1024; ++i) {
q.enqueue(Foo());
}
for (int i = 0; i != 1024; ++i) {
Foo item;
q.try_dequeue(item);
}
}
ASSERT_OR_FAIL(Traits::malloc_count() == 1 + 2 + 254 + 6);
ASSERT_OR_FAIL(Traits::free_count() == Traits::malloc_count());
ASSERT_OR_FAIL(Foo::createCount() == 2048);
ASSERT_OR_FAIL(Foo::destroyCount() == 3072);
ASSERT_OR_FAIL(Foo::destroyedInOrder());
return true;
}
bool try_dequeue()
{
ConcurrentQueue<int, MallocTrackingTraits> q;
int item;
// Producer token
{
for (int i = 0; i != 50; ++i) {
ProducerToken t(q);
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.enqueue(t, i * 100 + j));
}
}
for (int i = 0; i != 50; ++i) {
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i * 100 + j);
}
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// Mixed producer types
{
for (int i = 0; i != 25; ++i) {
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.enqueue(i * 100 + j));
}
}
for (int i = 25; i != 50; ++i) {
ProducerToken t(q);
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.enqueue(t, i * 100 + j));
}
}
bool success[5000];
std::memset(success, 0, sizeof(success));
for (int i = 0; i != 50; ++i) {
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.try_dequeue(item));
success[item] = true;
}
}
for (int i = 0; i != 5000; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// Mixed producer types with consumer token
{
for (int i = 0; i != 25; ++i) {
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.enqueue(i * 100 + j));
}
}
for (int i = 25; i != 50; ++i) {
ProducerToken t(q);
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.enqueue(t, i * 100 + j));
}
}
bool success[5000];
std::memset(success, 0, sizeof(success));
for (int i = 0; i != 50; ++i) {
ConsumerToken t(q);
for (int j = 0; j != 100; ++j) {
ASSERT_OR_FAIL(q.try_dequeue(t, item));
success[item] = true;
}
}
for (int i = 0; i != 5000; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ConsumerToken t(q);
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(!q.try_dequeue(t, item));
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(!q.try_dequeue(t, item));
}
return true;
}
bool try_dequeue_threaded()
{
int item;
ConcurrentQueue<int, MallocTrackingTraits> q;
// Threaded consumption with tokens
{
SimpleThread threads[20];
for (int i = 0; i != 10; ++i) {
threads[i] = SimpleThread([&](int i) {
ProducerToken t(q);
for (int j = 0; j != 100; ++j) {
q.enqueue(t, i * 10 + j);
}
}, i);
}
std::atomic<int> dequeueCount(0);
for (int i = 10; i != 20; ++i) {
threads[i] = SimpleThread([&]() {
int item;
ConsumerToken t(q);
while (dequeueCount.load(std::memory_order_relaxed) != 1000) {
if (q.try_dequeue(t, item)) {
dequeueCount.fetch_add(1, std::memory_order_relaxed);
}
}
});
}
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// Threaded consumption
{
SimpleThread threads[20];
for (int i = 0; i != 10; ++i) {
threads[i] = SimpleThread([&](int i) {
for (int j = 0; j != 100; ++j) {
q.enqueue(i * 10 + j);
}
}, i);
}
std::atomic<int> dequeueCount(0);
for (int i = 10; i != 20; ++i) {
threads[i] = SimpleThread([&]() {
int item;
while (dequeueCount.load(std::memory_order_relaxed) != 1000) {
if (q.try_dequeue(item)) {
dequeueCount.fetch_add(1, std::memory_order_relaxed);
}
}
});
}
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
return true;
}
bool try_dequeue_bulk()
{
typedef TestTraits<4> Traits;
int items[5];
// Explicit producer
{
Traits::reset();
ConcurrentQueue<int, Traits> q;
ProducerToken tok(q);
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 0);
q.enqueue(tok, 17);
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 1);
ASSERT_OR_FAIL(items[0] == 17);
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 4; ++i) {
q.enqueue(tok, i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 4);
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 5; ++i) {
q.enqueue(tok, i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 6; ++i) {
q.enqueue(tok, i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue(items[0]));
ASSERT_OR_FAIL(items[0] == 6);
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 10; ++i) {
q.enqueue(tok, i + 1);
}
for (int k = 0; k != 2; ++k) {
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == k * 5 + i + 1);
}
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
}
// Implicit producer
{
Traits::reset();
ConcurrentQueue<int, Traits> q;
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 0);
q.enqueue(17);
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 1);
ASSERT_OR_FAIL(items[0] == 17);
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 4; ++i) {
q.enqueue(i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 4);
for (int i = 0; i != 4; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 5; ++i) {
q.enqueue(i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 6; ++i) {
q.enqueue(i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == i + 1);
}
ASSERT_OR_FAIL(q.try_dequeue(items[0]));
ASSERT_OR_FAIL(items[0] == 6);
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
for (int i = 0; i != 10; ++i) {
q.enqueue(i + 1);
}
for (int k = 0; k != 2; ++k) {
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 5) == 5);
for (int i = 0; i != 5; ++i) {
ASSERT_OR_FAIL(items[i] == k * 5 + i + 1);
}
}
ASSERT_OR_FAIL(!q.try_dequeue(items[0]));
}
return true;
}
bool try_dequeue_bulk_threaded()
{
typedef TestTraits<2> Traits;
int dummy;
// Explicit producer
{
Traits::reset();
ConcurrentQueue<int, Traits> q;
SimpleThread threads[2];
bool success[2] = { true, true };
for (int i = 0; i != 2; ++i) {
if (i == 0) {
threads[i] = SimpleThread([&](int) {
// Producer
ProducerToken tok(q);
for (int i = 0; i != 32*1024; ++i) {
q.enqueue(tok, i);
}
}, i);
}
else {
threads[i] = SimpleThread([&](int) {
// Consumer
int items[5];
int prevItem = -1;
for (int i = 0; i != 32*1024;) {
auto dequeued = q.try_dequeue_bulk(items, 5);
if (dequeued > 0) {
if (dequeued > 5) {
success[i] = false;
break;
}
for (std::size_t j = 0; j != dequeued; ++j) {
if (items[j] != prevItem + 1) {
success[i] = false;
}
prevItem = items[j];
}
i += (int)dequeued;
}
}
}, i);
}
}
for (int i = 0; i != 2; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(success[0]);
ASSERT_OR_FAIL(success[1]);
ASSERT_OR_FAIL(!q.try_dequeue(dummy));
}
// Implicit producer
{
Traits::reset();
ConcurrentQueue<int, Traits> q;
SimpleThread threads[2];
bool success[2] = { true, true };
for (int i = 0; i != 2; ++i) {
if (i == 0) {
threads[i] = SimpleThread([&](int) {
// Producer
for (int i = 0; i != 32*1024; ++i) {
q.enqueue(i);
}
}, i);
}
else {
threads[i] = SimpleThread([&](int) {
// Consumer
int items[5];
int prevItem = -1;
for (int i = 0; i != 32*1024;) {
auto dequeued = q.try_dequeue_bulk(items, 5);
if (dequeued > 0) {
if (dequeued > 5) {
success[i] = false;
break;
}
for (std::size_t j = 0; j != dequeued; ++j) {
if (items[j] != prevItem + 1) {
success[i] = false;
}
prevItem = items[j];
}
i += (int)dequeued;
}
}
}, i);
}
}
for (int i = 0; i != 2; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(success[0]);
ASSERT_OR_FAIL(success[1]);
ASSERT_OR_FAIL(!q.try_dequeue(dummy));
}
// Multithreaded consumption
{
Traits::reset();
ConcurrentQueue<int, Traits> q;
bool success[20];
SimpleThread threads[20];
for (int i = 0; i != 10; ++i) {
success[i] = true;
threads[i] = SimpleThread([&](int i) {
ProducerToken t(q);
if ((i & 1) == 1) {
for (int j = 0; j != 100; ++j) {
q.enqueue(t, i * 128 + j);
}
}
else {
for (int j = 0; j != 100; ++j) {
q.enqueue(i * 128 + j);
}
}
}, i);
}
std::atomic<size_t> dequeueCount(0);
for (int i = 10; i != 20; ++i) {
success[i] = true;
threads[i] = SimpleThread([&](int i) {
int prevItems[10];
for (int j = 0; j != 10; ++j) {
prevItems[j] = -1;
}
int items[15];
ConsumerToken t(q);
while (dequeueCount.load(std::memory_order_relaxed) != 1000) {
size_t count;
if ((i & 1) == 1) {
count = q.try_dequeue_bulk(items, 15);
}
else {
count = q.try_dequeue_bulk(t, items, 15);
}
if (count > 15) {
success[i] = false;
}
for (size_t k = 0; k != count; ++k) {
if (prevItems[items[k] / 128] >= (items[k] & 127)) {
success[i] = false;
}
prevItems[items[k] / 128] = items[k] & 127;
}
dequeueCount.fetch_add(count, std::memory_order_relaxed);
}
}, i);
}
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
int item;
ASSERT_OR_FAIL(!q.try_dequeue(item));
for (int i = 0; i != 20; ++i) {
ASSERT_OR_FAIL(success[i]);
}
}
return true;
}
bool implicit_producer_hash()
{
for (int j = 0; j != 5; ++j) {
ConcurrentQueue<int, MallocTrackingTraits> q;
std::vector<SimpleThread> threads;
for (int i = 0; i != 20; ++i) {
threads.push_back(SimpleThread([&]() {
q.enqueue(7);
}));
}
for (auto it = threads.begin(); it != threads.end(); ++it) {
it->join();
}
int item;
ConsumerToken t(q);
for (auto i = 0; i != 20; ++i) {
if ((j & 1) == 0) {
ASSERT_OR_FAIL(q.try_dequeue(item));
}
else {
ASSERT_OR_FAIL(q.try_dequeue(t, item));
}
ASSERT_OR_FAIL(item == 7);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
return true;
}
bool index_wrapping()
{
{
// Implicit
ConcurrentQueue<int, SmallIndexTraits> q(16);
int item;
for (int i = 0; i != (1 << 18); ++i) {
if ((i & 16) == 0) {
ASSERT_OR_FAIL(q.try_enqueue(i));
}
else {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == (i - 16));
}
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
// Explicit
ConcurrentQueue<int, SmallIndexTraits> q(16);
ProducerToken tok(q);
int item;
for (int i = 0; i != (1 << 18); ++i) {
if ((i & 16) == 0) {
ASSERT_OR_FAIL(q.try_enqueue(tok, i));
}
else {
ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
ASSERT_OR_FAIL(item == (i - 16));
}
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
// Implicit extra small
ConcurrentQueue<int, ExtraSmallIndexTraits> q(1);
int item;
for (int i = 0; i != 4097; ++i) {
q.enqueue(i);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
// Explicit extra small
ConcurrentQueue<int, ExtraSmallIndexTraits> q(1);
ProducerToken tok(q);
int item;
for (int i = 0; i != 4097; ++i) {
q.enqueue(tok, i);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
return true;
}
struct SizeLimitTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 2;
static const size_t MAX_SUBQUEUE_SIZE = 5; // Will round up to 6 because of block size
};
bool subqueue_size_limit()
{
{
// Explicit
ConcurrentQueue<int, SizeLimitTraits> q;
ProducerToken t(q);
int item;
ASSERT_OR_FAIL(q.enqueue(t, 1));
ASSERT_OR_FAIL(q.enqueue(t, 2));
ASSERT_OR_FAIL(q.enqueue(t, 3));
ASSERT_OR_FAIL(q.enqueue(t, 4));
ASSERT_OR_FAIL(q.enqueue(t, 5));
ASSERT_OR_FAIL(q.enqueue(t, 6));
ASSERT_OR_FAIL(!q.enqueue(t, 7));
ASSERT_OR_FAIL(!q.enqueue(t, 8));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 1);
ASSERT_OR_FAIL(!q.enqueue(t, 7)); // Can't reuse block until it's completely empty
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 2);
ASSERT_OR_FAIL(q.enqueue(t, 7));
ASSERT_OR_FAIL(q.enqueue(t, 8));
ASSERT_OR_FAIL(!q.enqueue(t, 9));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 3);
ASSERT_OR_FAIL(!q.enqueue(t, 9));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 4);
ASSERT_OR_FAIL(q.enqueue(t, 9));
for (int i = 5; i <= 9; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item) && item == i);
}
ASSERT_OR_FAIL(q.enqueue(t, 10));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 10);
ASSERT_OR_FAIL(!q.try_dequeue(item));
for (int i = 0; i != 6; ++i) {
ASSERT_OR_FAIL(q.try_enqueue(t, i));
}
ASSERT_OR_FAIL(!q.try_enqueue(t, 7));
ASSERT_OR_FAIL(!q.enqueue(t, 7));
// Bulk
int items[6];
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 6) == 6);
ASSERT_OR_FAIL(!q.try_enqueue_bulk(t, items, 7));
ASSERT_OR_FAIL(!q.enqueue_bulk(t, items, 7));
ASSERT_OR_FAIL(q.enqueue_bulk(t, items, 6));
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 6) == 6);
ASSERT_OR_FAIL(q.enqueue_bulk(t, items, 3));
ASSERT_OR_FAIL(!q.enqueue_bulk(t, items, 4));
ASSERT_OR_FAIL(q.enqueue_bulk(t, items, 3));
ASSERT_OR_FAIL(!q.enqueue_bulk(t, items, 1));
ASSERT_OR_FAIL(!q.enqueue(t, 100));
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 1) == 1);
ASSERT_OR_FAIL(!q.enqueue(t, 100));
}
{
// Implicit
ConcurrentQueue<int, SizeLimitTraits> q;
int item;
ASSERT_OR_FAIL(q.enqueue(1));
ASSERT_OR_FAIL(q.enqueue(2));
ASSERT_OR_FAIL(q.enqueue(3));
ASSERT_OR_FAIL(q.enqueue(4));
ASSERT_OR_FAIL(q.enqueue(5));
ASSERT_OR_FAIL(q.enqueue(6));
ASSERT_OR_FAIL(!q.enqueue(7));
ASSERT_OR_FAIL(!q.enqueue(8));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 1);
ASSERT_OR_FAIL(!q.enqueue(7)); // Can't reuse block until it's completely empty
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 2);
ASSERT_OR_FAIL(q.enqueue(7));
ASSERT_OR_FAIL(q.enqueue(8));
ASSERT_OR_FAIL(!q.enqueue(9));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 3);
ASSERT_OR_FAIL(!q.enqueue(9));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 4);
ASSERT_OR_FAIL(q.enqueue(9));
for (int i = 5; i <= 9; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item) && item == i);
}
ASSERT_OR_FAIL(q.enqueue(10));
ASSERT_OR_FAIL(q.try_dequeue(item) && item == 10);
ASSERT_OR_FAIL(!q.try_dequeue(item));
for (int i = 0; i != 6; ++i) {
ASSERT_OR_FAIL(q.try_enqueue(i));
}
ASSERT_OR_FAIL(!q.try_enqueue(7));
ASSERT_OR_FAIL(!q.enqueue(7));
// Bulk
int items[6];
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 6) == 6);
ASSERT_OR_FAIL(!q.try_enqueue_bulk(items, 7));
ASSERT_OR_FAIL(!q.enqueue_bulk(items, 7));
ASSERT_OR_FAIL(q.enqueue_bulk(items, 6));
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 6) == 6);
ASSERT_OR_FAIL(q.enqueue_bulk(items, 3));
ASSERT_OR_FAIL(!q.enqueue_bulk(items, 4));
ASSERT_OR_FAIL(q.enqueue_bulk(items, 3));
ASSERT_OR_FAIL(!q.enqueue_bulk(items, 1));
ASSERT_OR_FAIL(!q.enqueue(100));
ASSERT_OR_FAIL(q.try_dequeue_bulk(items, 1) == 1);
ASSERT_OR_FAIL(!q.enqueue(100));
}
return true;
}
bool exceptions()
{
typedef TestTraits<4, 2> Traits;
{
// Explicit, basic
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ProducerToken tok(q);
ThrowingMovable::reset();
bool threw = false;
try {
q.enqueue(tok, ThrowingMovable(1, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 1);
ASSERT_OR_FAIL(m->moved);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(q.enqueue(tok, ThrowingMovable(2)));
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 2);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 3);
// dequeue
ThrowingMovable::reset();
q.enqueue(tok, ThrowingMovable(10));
q.enqueue(tok, ThrowingMovable(11, false, true));
q.enqueue(tok, ThrowingMovable(12));
ASSERT_OR_FAIL(q.size_approx() == 3);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 10);
threw = false;
try {
q.try_dequeue(result);
}
catch (ThrowingMovable* m) {
ASSERT_OR_FAIL(m->id == 11);
threw = true;
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 12);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
q.enqueue(tok, ThrowingMovable(13));
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 13);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 8);
}
{
// Explicit, on and off block boundaries
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ProducerToken tok(q);
ThrowingMovable::reset();
for (int i = 0; i != 3; ++i) {
q.enqueue(tok, ThrowingMovable(i));
}
bool threw = false;
try {
q.enqueue(tok, ThrowingMovable(3, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 3);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 3);
q.enqueue(tok, ThrowingMovable(4));
threw = false;
try {
q.enqueue(tok, ThrowingMovable(5, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
ASSERT_OR_FAIL(m->moved);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 4);
q.enqueue(tok, ThrowingMovable(6));
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 0);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 2);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 4);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 6);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 12);
// dequeue
ThrowingMovable::reset();
q.enqueue(tok, ThrowingMovable(10, false, true));
q.enqueue(tok, ThrowingMovable(11));
q.enqueue(tok, ThrowingMovable(12));
q.enqueue(tok, ThrowingMovable(13, false, true));
q.enqueue(tok, ThrowingMovable(14, false, true));
q.enqueue(tok, ThrowingMovable(15, false, true));
q.enqueue(tok, ThrowingMovable(16));
ASSERT_OR_FAIL(q.size_approx() == 7);
for (int i = 10; i != 17; ++i) {
if (i == 10 || (i >= 13 && i <= 15)) {
threw = false;
try {
q.try_dequeue(result);
}
catch (ThrowingMovable* m) {
ASSERT_OR_FAIL(m->id == i);
ASSERT_OR_FAIL(m->moved);
threw = true;
}
ASSERT_OR_FAIL(threw);
}
else {
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == i);
ASSERT_OR_FAIL(result.moved);
}
ASSERT_OR_FAIL(q.size_approx() == (std::uint32_t)(16 - i));
}
ASSERT_OR_FAIL(!q.try_dequeue(result));
q.enqueue(tok, ThrowingMovable(20));
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 20);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 16);
}
{
// Explicit bulk
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ProducerToken tok(q);
ThrowingMovable::reset();
std::vector<ThrowingMovable> items;
items.reserve(5);
items.push_back(ThrowingMovable(1));
items.push_back(ThrowingMovable(2));
items.push_back(ThrowingMovable(3));
items.push_back(ThrowingMovable(4));
items.push_back(ThrowingMovable(5));
items.back().throwOnCctor = true;
bool threw = false;
try {
q.enqueue_bulk(tok, std::make_move_iterator(items.begin()), 5);
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
ASSERT_OR_FAIL(m->copied);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
q.enqueue(tok, ThrowingMovable(6));
threw = false;
try {
q.enqueue_bulk(tok, std::make_move_iterator(items.begin()), 5);
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 1);
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 6);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 15);
// dequeue
ThrowingMovable::reset();
q.enqueue(tok, ThrowingMovable(10));
q.enqueue(tok, ThrowingMovable(11));
q.enqueue(tok, ThrowingMovable(12));
q.enqueue(tok, ThrowingMovable(13));
q.enqueue(tok, ThrowingMovable(14, false, true, true)); // std::back_inserter turns an assignment into a ctor call
q.enqueue(tok, ThrowingMovable(15));
ASSERT_OR_FAIL(q.size_approx() == 6);
std::vector<ThrowingMovable> results;
results.reserve(5);
ASSERT_OR_FAIL(q.try_dequeue_bulk(std::back_inserter(results), 2));
ASSERT_OR_FAIL(results.size() == 2);
ASSERT_OR_FAIL(results[0].id == 10);
ASSERT_OR_FAIL(results[1].id == 11);
ASSERT_OR_FAIL(results[0].moved);
ASSERT_OR_FAIL(results[1].moved);
ASSERT_OR_FAIL(q.size_approx() == 4);
threw = false;
try {
q.try_dequeue_bulk(std::back_inserter(results), 4);
}
catch (ThrowingMovable*) {
// Note: Can't inspect thrown value since it points to an object whose construction was attempted on the vector and
// no longer exists
threw = true;
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(q.try_dequeue_bulk(std::back_inserter(results), 1) == 0);
ASSERT_OR_FAIL(results.size() == 4);
ASSERT_OR_FAIL(results[2].id == 12);
ASSERT_OR_FAIL(results[3].id == 13);
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 12);
}
{
// Implicit, basic
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ThrowingMovable::reset();
bool threw = false;
try {
q.enqueue(ThrowingMovable(1, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 1);
ASSERT_OR_FAIL(m->moved);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(q.enqueue(ThrowingMovable(2)));
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 2);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 3);
// dequeue
ThrowingMovable::reset();
q.enqueue(ThrowingMovable(10));
q.enqueue(ThrowingMovable(11, false, true));
q.enqueue(ThrowingMovable(12));
ASSERT_OR_FAIL(q.size_approx() == 3);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 10);
threw = false;
try {
q.try_dequeue(result);
}
catch (ThrowingMovable* m) {
ASSERT_OR_FAIL(m->id == 11);
threw = true;
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 12);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
q.enqueue(ThrowingMovable(13));
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 13);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 8);
}
{
// Implicit, on and off block boundaries
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ThrowingMovable::reset();
for (int i = 0; i != 3; ++i) {
q.enqueue(ThrowingMovable(i));
}
bool threw = false;
try {
q.enqueue(ThrowingMovable(3, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 3);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 3);
q.enqueue(ThrowingMovable(4));
threw = false;
try {
q.enqueue(ThrowingMovable(5, true));
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
ASSERT_OR_FAIL(m->moved);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 4);
q.enqueue(ThrowingMovable(6));
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 0);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 2);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 4);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 6);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 12);
// dequeue
ThrowingMovable::reset();
q.enqueue(ThrowingMovable(10, false, true));
q.enqueue(ThrowingMovable(11));
q.enqueue(ThrowingMovable(12));
q.enqueue(ThrowingMovable(13, false, true));
q.enqueue(ThrowingMovable(14, false, true));
q.enqueue(ThrowingMovable(15, false, true));
q.enqueue(ThrowingMovable(16));
ASSERT_OR_FAIL(q.size_approx() == 7);
for (int i = 10; i != 17; ++i) {
if (i == 10 || (i >= 13 && i <= 15)) {
threw = false;
try {
q.try_dequeue(result);
}
catch (ThrowingMovable* m) {
ASSERT_OR_FAIL(m->id == i);
ASSERT_OR_FAIL(m->moved);
threw = true;
}
ASSERT_OR_FAIL(threw);
}
else {
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == i);
ASSERT_OR_FAIL(result.moved);
}
ASSERT_OR_FAIL(q.size_approx() == (std::uint32_t)(16 - i));
}
ASSERT_OR_FAIL(!q.try_dequeue(result));
q.enqueue(ThrowingMovable(20));
ASSERT_OR_FAIL(q.size_approx() == 1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 20);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 16);
}
{
// Impplicit bulk
// enqueue
ConcurrentQueue<ThrowingMovable, Traits> q;
ThrowingMovable::reset();
std::vector<ThrowingMovable> items;
items.reserve(5);
items.push_back(ThrowingMovable(1));
items.push_back(ThrowingMovable(2));
items.push_back(ThrowingMovable(3));
items.push_back(ThrowingMovable(4));
items.push_back(ThrowingMovable(5));
items.back().throwOnCctor = true;
bool threw = false;
try {
q.enqueue_bulk(std::make_move_iterator(items.begin()), 5);
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
ASSERT_OR_FAIL(m->copied);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
q.enqueue(ThrowingMovable(6));
threw = false;
try {
q.enqueue_bulk(std::make_move_iterator(items.begin()), 5);
}
catch (ThrowingMovable* m) {
threw = true;
ASSERT_OR_FAIL(m->id == 5);
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 1);
ThrowingMovable result(-1);
ASSERT_OR_FAIL(q.try_dequeue(result));
ASSERT_OR_FAIL(result.id == 6);
ASSERT_OR_FAIL(result.moved);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 15);
// dequeue
ThrowingMovable::reset();
q.enqueue(ThrowingMovable(10));
q.enqueue(ThrowingMovable(11));
q.enqueue(ThrowingMovable(12));
q.enqueue(ThrowingMovable(13));
q.enqueue(ThrowingMovable(14, false, true, true)); // std::back_inserter turns an assignment into a ctor call
q.enqueue(ThrowingMovable(15));
ASSERT_OR_FAIL(q.size_approx() == 6);
std::vector<ThrowingMovable> results;
results.reserve(5);
ASSERT_OR_FAIL(q.try_dequeue_bulk(std::back_inserter(results), 2));
ASSERT_OR_FAIL(results.size() == 2);
ASSERT_OR_FAIL(results[0].id == 10);
ASSERT_OR_FAIL(results[1].id == 11);
ASSERT_OR_FAIL(results[0].moved);
ASSERT_OR_FAIL(results[1].moved);
ASSERT_OR_FAIL(q.size_approx() == 4);
threw = false;
try {
q.try_dequeue_bulk(std::back_inserter(results), 4);
}
catch (ThrowingMovable*) {
threw = true;
}
ASSERT_OR_FAIL(threw);
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(!q.try_dequeue(result));
ASSERT_OR_FAIL(q.try_dequeue_bulk(std::back_inserter(results), 1) == 0);
ASSERT_OR_FAIL(results.size() == 4);
ASSERT_OR_FAIL(results[2].id == 12);
ASSERT_OR_FAIL(results[3].id == 13);
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() == 12);
}
{
// Threaded
ConcurrentQueue<ThrowingMovable, Traits> q;
ThrowingMovable::reset();
std::vector<SimpleThread> threads(6);
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](std::size_t tid) {
std::vector<ThrowingMovable> inVec;
inVec.push_back(ThrowingMovable(1));
inVec.push_back(ThrowingMovable(2));
inVec.push_back(ThrowingMovable(3));
std::vector<ThrowingMovable> outVec;
outVec.push_back(ThrowingMovable(-1));
outVec.push_back(ThrowingMovable(-1));
outVec.push_back(ThrowingMovable(-1));
ProducerToken tok(q);
ThrowingMovable result(-1);
for (std::size_t i = 0; i != 8192; ++i) {
auto magic = (tid + 1) * i + tid * 17 + i;
auto op = magic & 7;
auto ctorThrow = (magic & 0x10) != 0;
auto assignThrow = (magic & 0x20) != 0;
auto throwOnNextCctor = (magic & 0x40) != 0;
try {
switch (op) {
case 0:
q.enqueue(tok, ThrowingMovable((int)i, ctorThrow, assignThrow, throwOnNextCctor));
break;
case 1:
inVec[i & 3].throwOnCctor = ctorThrow;
inVec[i & 3].throwOnAssignment = assignThrow;
inVec[i & 3].throwOnSecondCctor = throwOnNextCctor;
q.enqueue_bulk(tok, inVec.begin(), 3);
break;
case 2:
q.enqueue(ThrowingMovable((int)i, ctorThrow, assignThrow, throwOnNextCctor));
break;
case 3:
inVec[i & 3].throwOnCctor = ctorThrow;
inVec[i & 3].throwOnAssignment = assignThrow;
inVec[i & 3].throwOnSecondCctor = throwOnNextCctor;
q.enqueue_bulk(inVec.begin(), 3);
break;
case 4:
case 5:
q.try_dequeue(result);
break;
case 6:
case 7:
q.try_dequeue_bulk(outVec.data(), 3);
break;
}
}
catch (ThrowingMovable*) {
}
}
}, tid);
}
for (std::size_t i = 0; i != threads.size(); ++i) {
threads[i].join();
}
ThrowingMovable result(-1);
while (true) {
try {
if (!q.try_dequeue(result)) {
break;
}
}
catch (ThrowingMovable*) {
}
}
ASSERT_OR_FAIL(ThrowingMovable::destroyCount() + 1 == ThrowingMovable::ctorCount());
}
return true;
}
bool implicit_producer_churn()
{
typedef TestTraits<4> Traits;
for (int i = 0; i != 256; ++i) {
std::vector<SimpleThread> threads(32);
ConcurrentQueue<int, Traits> q;
for (auto& thread : threads) {
thread = SimpleThread([&] {
int x;
for (int j = 0; j != 16; ++j) {
q.enqueue(0);
q.try_dequeue(x);
}
});
}
for (auto& thread : threads) {
thread.join();
}
}
return true;
}
bool test_threaded()
{
typedef TestTraits<4> Traits;
Traits::reset();
bool inOrder = true;
{
// Single producer, single consumer
ConcurrentQueue<int, Traits> q;
ProducerToken t(q);
SimpleThread a([&]() {
for (int i = 0; i != 123456; ++i) {
q.enqueue(t, i);
}
});
SimpleThread b([&]() {
int item;
int prevItem = -1;
while (true) {
if (q.try_dequeue_from_producer(t, item)) {
if (item == 123455) {
break;
}
inOrder = item == prevItem + 1 && inOrder;
prevItem = item;
}
}
});
a.join();
b.join();
}
ASSERT_OR_FAIL(inOrder);
{
// Single producer, multi consumer
ConcurrentQueue<int, Traits> q;
ProducerToken t(q);
SimpleThread a([&]() {
for (int i = 0; i != 123456; ++i) {
q.enqueue(t, i);
}
});
SimpleThread b([&]() {
int item, prevItem = -1;
for (int i = 0; i != 123456; ++i) {
if (q.try_dequeue_from_producer(t, item)) {
inOrder = item > prevItem && inOrder;
prevItem = item;
}
}
});
SimpleThread c([&]() {
int item;
for (int i = 0; i != 123456; ++i) q.try_dequeue_from_producer(t, item);
});
SimpleThread d([&]() {
int item;
for (int i = 0; i != 123456; ++i) q.try_dequeue_from_producer(t, item);
});
a.join();
b.join();
c.join();
d.join();
}
ASSERT_OR_FAIL(inOrder);
ASSERT_OR_FAIL(Traits::malloc_count() == Traits::free_count());
return true;
}
bool test_threaded_bulk()
{
typedef TestTraits<2> Traits;
// Enqueue bulk (implicit)
Traits::reset();
{
ConcurrentQueue<int, Traits> q;
SimpleThread threads[2];
bool success[2];
int stuff[] = { 1, 2, 3, 4, 5 };
for (int i = 0; i != 2; ++i) {
success[i] = true;
if (i == 0) {
// Enqueue bulk
threads[i] = SimpleThread([&](int j) {
for (int k = 0; k != 2048; ++k) {
success[j] = q.enqueue_bulk(stuff, 5) && success[j];
}
}, i);
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
int item;
int prevItem = 0;
for (int k = 0; k != 2048 * 5;) {
if (q.try_dequeue(item)) {
if (item != prevItem + 1) {
success[j] = false;
}
prevItem = item;
if (item == 5) {
prevItem = 0;
}
++k;
}
}
}, i);
}
}
for (int i = 0; i != 2; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(success[0]);
ASSERT_OR_FAIL(success[1]);
}
// Enqueue bulk (while somebody is dequeueing (with tokens))
Traits::reset();
{
ConcurrentQueue<int, Traits> q;
SimpleThread threads[2];
bool success[2];
int stuff[] = { 1, 2, 3, 4, 5 };
for (int i = 0; i != 2; ++i) {
success[i] = true;
if (i == 0) {
// Enqueue bulk
threads[i] = SimpleThread([&](int j) {
ProducerToken tok(q);
for (int k = 0; k != 2048; ++k) {
success[j] = q.enqueue_bulk(tok, stuff, 5) && success[j];
}
}, i);
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
ConsumerToken tok(q);
int item;
int prevItem = 0;
for (int k = 0; k != 2048 * 5;) {
if (q.try_dequeue(tok, item)) {
if (item != prevItem + 1) {
success[j] = false;
}
prevItem = item;
if (item == 5) {
prevItem = 0;
}
++k;
}
}
}, i);
}
}
for (int i = 0; i != 2; ++i) {
threads[i].join();
}
ASSERT_OR_FAIL(success[0]);
ASSERT_OR_FAIL(success[1]);
}
return true;
}
template<typename Traits>
bool full_api()
{
// A simple test that exercises the full public API (just to make sure every function is implemented
// and works on at least the most basic level)
// enqueue(T const&)
{
ConcurrentQueue<Copyable, Traits> q;
Copyable original(12345);
ASSERT_OR_FAIL(q.enqueue(original));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue(T&&)
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
Moveable original(12345);
ASSERT_OR_FAIL(q.enqueue(std::move(original)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue(Token, T const&)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
Copyable original(12345);
ASSERT_OR_FAIL(q.enqueue(t, original));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue(Token, T&&)
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
Moveable original(12345);
ASSERT_OR_FAIL(q.enqueue(t, std::move(original)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue(T const&)
{
ConcurrentQueue<Copyable, Traits> q;
Copyable original(12345);
ASSERT_OR_FAIL(q.try_enqueue(original));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue(T&&)
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.try_enqueue(Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
Moveable original(12345);
ASSERT_OR_FAIL(q.try_enqueue(std::move(original)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.try_enqueue(Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue(Token, T const&)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
Copyable original(12345);
ASSERT_OR_FAIL(q.try_enqueue(t, original));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue(Token, T&&)
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.try_enqueue(t, Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
Moveable original(12345);
ASSERT_OR_FAIL(q.try_enqueue(t, std::move(original)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.try_enqueue(t, Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue_bulk(It itemFirst, size_t count)
{
ConcurrentQueue<Copyable, Traits> q;
Copyable original(12345);
ASSERT_OR_FAIL(q.enqueue_bulk(&original, 1));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
Moveable original(12345);
ASSERT_OR_FAIL(q.enqueue_bulk(std::make_move_iterator(&original), 1));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue_bulk(Token, It itemFirst, size_t count)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
Copyable original(12345);
ASSERT_OR_FAIL(q.enqueue_bulk(t, &original, 1));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
Moveable original(12345);
ASSERT_OR_FAIL(q.enqueue_bulk(t, std::make_move_iterator(&original), 1));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue_bulk(It itemFirst, size_t count)
{
ConcurrentQueue<Copyable, Traits> q;
Copyable original(12345);
ASSERT_OR_FAIL(q.try_enqueue_bulk(&original, 1));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
Moveable original(12345);
ASSERT_OR_FAIL(q.try_enqueue_bulk(std::make_move_iterator(&original), 1));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_enqueue_bulk(Token, It itemFirst, size_t count)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
Copyable original(12345);
ASSERT_OR_FAIL(q.try_enqueue_bulk(t, &original, 1));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
Moveable original(12345);
ASSERT_OR_FAIL(q.try_enqueue_bulk(t, std::make_move_iterator(&original), 1));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_dequeue(T&)
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_dequeue(Token, T&)
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Copyable(12345)));
Copyable item(0);
ConsumerToken t(q);
ASSERT_OR_FAIL(q.try_dequeue(t, item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(t, item));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Moveable(12345)));
Moveable item(0);
ConsumerToken t(q);
ASSERT_OR_FAIL(q.try_dequeue(t, item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(t, item));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_dequeue_from_producer(Token, T&)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_from_producer(t, item));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// try_dequeue_bulk(T&)
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue_bulk(&item, 1) == 1);
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk(&item, 1));
}
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue_bulk(&item, 1) == 1);
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk(&item, 1));
}
// try_dequeue_bulk(Token, T&)
{
ConcurrentQueue<Copyable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Copyable(12345)));
Copyable item(0);
ConsumerToken t(q);
ASSERT_OR_FAIL(q.try_dequeue_bulk(t, &item, 1));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk(t, &item, 1));
ASSERT_OR_FAIL(!q.try_dequeue_bulk(&item, 1));
}
{
ConcurrentQueue<Moveable, Traits> q;
ASSERT_OR_FAIL(q.enqueue(Moveable(12345)));
Moveable item(0);
ConsumerToken t(q);
ASSERT_OR_FAIL(q.try_dequeue_bulk(t, &item, 1));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk(t, &item, 1));
ASSERT_OR_FAIL(!q.try_dequeue_bulk(&item, 1));
}
// try_dequeue_bulk_from_producer(Token, T&)
{
ConcurrentQueue<Copyable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Copyable(12345)));
Copyable item(0);
ASSERT_OR_FAIL(q.try_dequeue_bulk_from_producer(t, &item, 1));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk_from_producer(t, &item, 1));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue(t, Moveable(12345)));
Moveable item(0);
ASSERT_OR_FAIL(q.try_dequeue_bulk_from_producer(t, &item, 1));
ASSERT_OR_FAIL(item.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue_bulk_from_producer(t, &item, 1));
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// size_approx()
{
ConcurrentQueue<Foo, Traits> q;
for (int i = 0; i != 1234; ++i) {
q.enqueue(Foo());
}
ASSERT_OR_FAIL(q.size_approx() == 1234);
}
// is_lock_free()
{
constexpr bool lockFree = ConcurrentQueue<Foo, Traits>::is_lock_free();
#if defined(__amd64__) || defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || defined(__i386__) || defined(_M_PPC) || defined(__powerpc__)
static_assert(lockFree, "is_lock_free should be true");
#else
(void)lockFree;
#endif
}
// moving
{
ConcurrentQueue<int, MallocTrackingTraits> q(4);
ProducerToken t(q);
for (int i = 0; i != 1233; ++i) {
q.enqueue(i);
}
for (int i = 1234; i != 5678; ++i) {
q.enqueue(t, i);
}
ASSERT_OR_FAIL(q.size_approx() == 5677);
ConcurrentQueue<int, MallocTrackingTraits> q2(std::move(q));
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(q2.size_approx() == 5677);
q2.enqueue(t, 5678);
q2.enqueue(1233);
ASSERT_OR_FAIL(q2.size_approx() == 5679);
for (int i = 1234; i != 0; --i) {
q.enqueue(i);
}
ASSERT_OR_FAIL(q.size_approx() == 1234);
int item;
for (int i = 0; i <= 5678; ++i) {
ASSERT_OR_FAIL(q2.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q2.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(q2.size_approx() == 0);
for (int i = 1234; i != 0; --i) {
ASSERT_OR_FAIL(q.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(q.size_approx() == 0);
}
// swapping
{
ConcurrentQueue<int, MallocTrackingTraits> q1, q2, q3;
ProducerToken t1(q1), t2(q2), t3(q3);
for (int i = 1234; i != 5678; ++i) {
q1.enqueue(t1, i);
}
for (int i = 21234; i != 25678; ++i) {
q2.enqueue(t2, i);
}
for (int i = 31234; i != 35678; ++i) {
q3.enqueue(t3, i);
}
for (int i = 0; i != 1234; ++i) {
q1.enqueue(i);
}
for (int i = 20000; i != 21234; ++i) {
q2.enqueue(i);
}
for (int i = 30000; i != 31234; ++i) {
q3.enqueue(i);
}
{
ConcurrentQueue<int, MallocTrackingTraits> temp;
temp = std::move(q1);
q1 = std::move(q2);
q2 = std::move(temp);
}
// q1 in q2, q2 in q1
swap(q2, q3); // q1 in q3, q3 in q2
q1.swap(q2); // q2 in q2, q3 in q1
q1.swap(q2); // q3 in q2, q2 in q1
q1.swap(q2); // q2 in q2, q3 in q1
q2.swap(q3); // q1 in q2, q2 in q3
// So now q1 is in q2, q2 is in q3, and q3 is in q1
int item;
for (int i = 30000; i != 35678; ++i) {
ASSERT_OR_FAIL(q1.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q1.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(q1.size_approx() == 0);
for (int i = 0; i != 5678; ++i) {
ASSERT_OR_FAIL(q2.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q2.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(q2.size_approx() == 0);
for (int i = 20000; i != 25678; ++i) {
ASSERT_OR_FAIL(q3.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q3.try_dequeue_non_interleaved(item));
ASSERT_OR_FAIL(q3.size_approx() == 0);
}
return true;
}
bool blocking_wrappers()
{
typedef BlockingConcurrentQueue<int, MallocTrackingTraits> Q;
ASSERT_OR_FAIL((Q::is_lock_free() == ConcurrentQueue<int, MallocTrackingTraits>::is_lock_free()));
// Moving
{
Q a, b, c;
a = std::move(b);
b = std::move(c);
a = std::move(a);
c = std::move(b);
b = Q(std::move(b));
using std::swap;
swap(a, b);
a.swap(c);
c.swap(c);
}
// Implicit
{
Q q;
ASSERT_OR_FAIL(q.enqueue(1));
ASSERT_OR_FAIL(q.size_approx() == 1);
int item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == 1);
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(q.enqueue(2));
ASSERT_OR_FAIL(q.enqueue(3));
ASSERT_OR_FAIL(q.size_approx() == 2);
q.wait_dequeue(item);
ASSERT_OR_FAIL(item == 2);
ASSERT_OR_FAIL(q.size_approx() == 1);
q.wait_dequeue(item);
ASSERT_OR_FAIL(item == 3);
ASSERT_OR_FAIL(!q.try_dequeue(item));
ASSERT_OR_FAIL(q.size_approx() == 0);
}
// Implicit threaded
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue((j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
int item;
std::vector<int> prevItems(THREADS, -1);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
if (q.try_dequeue(item)) {
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}
else {
int items[6];
for (int k = 0; k < 4096; ++k) {
if (std::size_t dequeued = q.try_dequeue_bulk(items, 6)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
}
// Implicit threaded, blocking
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue((j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
int item;
std::vector<int> prevItems(THREADS, -1);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
q.wait_dequeue(item);
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
else {
int items[6];
int k;
for (k = 0; k < 4090; ) {
if (std::size_t dequeued = q.wait_dequeue_bulk(items, 6)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
k += (int)dequeued;
}
else {
success[j] = false;
}
}
for (; k != 4096; ++k) {
q.wait_dequeue(item);
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ASSERT_OR_FAIL(q.size_approx() == 0);
}
// Explicit
{
Q q;
ProducerToken pt(q);
ASSERT_OR_FAIL(q.enqueue(pt, 1));
ASSERT_OR_FAIL(q.size_approx() == 1);
int item;
ConsumerToken ct(q);
ASSERT_OR_FAIL(q.try_dequeue(ct, item));
ASSERT_OR_FAIL(item == 1);
ASSERT_OR_FAIL(!q.try_dequeue(ct, item));
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(q.enqueue(pt, 2));
ASSERT_OR_FAIL(q.enqueue(pt, 3));
ASSERT_OR_FAIL(q.size_approx() == 2);
q.wait_dequeue(ct, item);
ASSERT_OR_FAIL(item == 2);
ASSERT_OR_FAIL(q.size_approx() == 1);
q.wait_dequeue(ct, item);
ASSERT_OR_FAIL(item == 3);
ASSERT_OR_FAIL(!q.try_dequeue(ct, item));
ASSERT_OR_FAIL(q.size_approx() == 0);
}
// Explicit threaded
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
ProducerToken t(q);
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(t, stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
ProducerToken t(q);
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue(t, (j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
ConsumerToken t(q);
int item;
std::vector<int> prevItems(THREADS, -1);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
if (q.try_dequeue(t, item)) {
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}
else {
int items[6];
for (int k = 0; k < 4096; ++k) {
if (std::size_t dequeued = q.try_dequeue_bulk(t, items, 6)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
}
// Explicit threaded, blocking
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
ProducerToken t(q);
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(t, stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
ProducerToken t(q);
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue(t, (j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
ConsumerToken t(q);
int item;
std::vector<int> prevItems(THREADS, -1);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
q.wait_dequeue(t, item);
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
else {
int items[6];
int k;
for (k = 0; k < 4090; ) {
if (std::size_t dequeued = q.wait_dequeue_bulk(t, items, 6)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
k += (int)dequeued;
}
else {
success[j] = false;
}
}
for (; k != 4096; ++k) {
q.wait_dequeue(t, item);
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ASSERT_OR_FAIL(q.size_approx() == 0);
}
return true;
}
bool timed_blocking_wrappers()
{
typedef BlockingConcurrentQueue<int, MallocTrackingTraits> Q;
// Implicit
{
Q q;
int item;
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 0));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 1));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 100));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, std::chrono::milliseconds(1)));
q.enqueue(123);
ASSERT_OR_FAIL(q.wait_dequeue_timed(item, 0));
ASSERT_OR_FAIL(item == 123);
}
// Implicit, threaded
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue((j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
int item;
std::vector<int> prevItems(THREADS, -1);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
if (!q.wait_dequeue_timed(item, 1000)) {
--k;
continue;
}
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
else {
int items[6];
int k;
for (k = 0; k < 4090; ) {
if (std::size_t dequeued = q.wait_dequeue_bulk_timed(items, 6, 1000)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
k += (int)dequeued;
}
}
for (; k != 4096; ++k) {
if (!q.wait_dequeue_timed(item, std::chrono::hours(1))) {
success[j] = false;
}
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ASSERT_OR_FAIL(q.size_approx() == 0);
int item;
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 0));
}
// Explicit
{
Q q;
ProducerToken ptok(q);
ConsumerToken ctok(q);
int item;
ASSERT_OR_FAIL(!q.wait_dequeue_timed(ctok, item, 0));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(ctok, item, 1));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(ctok, item, 100));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(ctok, item, std::chrono::milliseconds(1)));
q.enqueue(ptok, 123);
ASSERT_OR_FAIL(q.wait_dequeue_timed(ctok, item, 0));
ASSERT_OR_FAIL(item == 123);
}
// Explicit, threaded
{
Q q;
const int THREADS = 8;
SimpleThread threads[THREADS];
bool success[THREADS];
for (int i = 0; i != THREADS; ++i) {
success[i] = true;
if (i % 2 == 0) {
// Enqueue
if (i % 4 == 0) {
threads[i] = SimpleThread([&](int j) {
ProducerToken tok(q);
int stuff[5];
for (int k = 0; k != 2048; ++k) {
for (int x = 0; x != 5; ++x) {
stuff[x] = (j << 16) | (k * 5 + x);
}
success[j] = q.enqueue_bulk(tok, stuff, 5) && success[j];
}
}, i);
}
else {
threads[i] = SimpleThread([&](int j) {
ProducerToken tok(q);
for (int k = 0; k != 4096; ++k) {
success[j] = q.enqueue(tok, (j << 16) | k) && success[j];
}
}, i);
}
}
else {
// Dequeue
threads[i] = SimpleThread([&](int j) {
int item;
std::vector<int> prevItems(THREADS, -1);
ConsumerToken tok(q);
if (j % 4 == 1) {
for (int k = 0; k != 2048 * 5; ++k) {
if (!q.wait_dequeue_timed(tok, item, 1000)) {
--k;
continue;
}
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
else {
int items[6];
int k;
for (k = 0; k < 4090; ) {
if (std::size_t dequeued = q.wait_dequeue_bulk_timed(tok, items, 6, 1000)) {
for (std::size_t x = 0; x != dequeued; ++x) {
item = items[x];
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
k += (int)dequeued;
}
}
for (; k != 4096; ++k) {
if (!q.wait_dequeue_timed(tok, item, std::chrono::hours(1))) {
success[j] = false;
}
int thread = item >> 16;
item &= 0xffff;
if (item <= prevItems[thread]) {
success[j] = false;
}
prevItems[thread] = item;
}
}
}, i);
}
}
for (int i = 0; i != THREADS; ++i) {
threads[i].join();
}
for (int i = 0; i != THREADS; ++i) {
ASSERT_OR_FAIL(success[i]);
}
ASSERT_OR_FAIL(q.size_approx() == 0);
int item;
ConsumerToken tok(q);
ASSERT_OR_FAIL(!q.wait_dequeue_timed(tok, item, 0));
}
return true;
}
bool c_api_create()
{
MoodycamelCQHandle handle;
int rc = moodycamel_cq_create(&handle);
ASSERT_OR_FAIL(rc == 1);
ASSERT_OR_FAIL(handle != nullptr);
moodycamel_cq_destroy(handle);
return true;
}
bool c_api_enqueue()
{
MoodycamelCQHandle handle;
int rc = moodycamel_cq_create(&handle);
int i = 10;
rc = moodycamel_cq_enqueue(handle, &i);
ASSERT_OR_FAIL(rc == 1);
moodycamel_cq_destroy(handle);
return true;
}
bool c_api_try_dequeue()
{
MoodycamelCQHandle handle;
int rc = moodycamel_cq_create(&handle);
{
MoodycamelValue n;
rc = moodycamel_cq_try_dequeue(handle, &n);
ASSERT_OR_FAIL(rc == 0);
}
int i = 10;
rc = moodycamel_cq_enqueue(handle, &i);
{
MoodycamelValue value;
rc = moodycamel_cq_try_dequeue(handle, &value);
int n = *reinterpret_cast<int*>(value);
ASSERT_OR_FAIL(rc == 1);
ASSERT_OR_FAIL(n == 10);
}
moodycamel_cq_destroy(handle);
return true;
}
bool c_api_destroy()
{
MoodycamelCQHandle handle;
moodycamel_cq_create(&handle);
moodycamel_cq_destroy(handle);
return true;
}
bool acquire_and_signal()
{
const unsigned TIMEOUT_US = 10 * 1000 * 1000; // 10s
// Test resource acquisition from one other thread
{
LightweightSemaphore s;
s.signal(); // Single resource available
auto fnTestSingleAcquire = [&]() {
for (std::size_t k = 0; k < 200000; ++k) {
s.wait(TIMEOUT_US);
s.signal();
}
};
SimpleThread t1(fnTestSingleAcquire);
SimpleThread t2(fnTestSingleAcquire);
t1.join();
t2.join();
ASSERT_OR_FAIL(s.availableApprox() == 1);
}
// Test resource acquisition from multiple threads
{
const int THREADS = 4;
const std::size_t ITERATIONS = 200000;
SimpleThread threads[THREADS];
const std::size_t arrayItemsToWait[THREADS] = { 1, 2, 3, 7 };
LightweightSemaphore s;
for (int i = 0; i != THREADS; ++i)
s.signal(ITERATIONS * arrayItemsToWait[i]);
for (int i = 0; i != THREADS; ++i) {
threads[i] = SimpleThread([&](int tid) {
for (std::size_t k = 0; k < ITERATIONS; ++k)
s.waitMany(arrayItemsToWait[tid], TIMEOUT_US);
}, i);
}
for (int i = 0; i != THREADS; ++i)
threads[i].join();
ASSERT_OR_FAIL(s.availableApprox() == 0);
}
{
const int THREADS = 5;
const std::size_t ITERATIONS = 100000;
SimpleThread threads[THREADS];
const std::size_t arrayItemsToWait[THREADS] = { 0, 1, 2, 3, 7 };
LightweightSemaphore s;
for (int i = 0; i != THREADS; ++i)
s.signal(ITERATIONS * arrayItemsToWait[i]);
for (int i = 0; i != THREADS; ++i) {
threads[i] = SimpleThread([&](int tid) {
if (tid == 0) {
for (std::size_t k = 0; k < ITERATIONS * (THREADS - 1); ++k)
s.wait(TIMEOUT_US);
}
else {
for (std::size_t k = 0; k < ITERATIONS; ++k) {
s.signal();
s.waitMany(arrayItemsToWait[tid], TIMEOUT_US);
}
}
}, i);
}
for (int i = 0; i != THREADS; ++i)
threads[i].join();
ASSERT_OR_FAIL(s.availableApprox() == 0);
}
LightweightSemaphore s;
ASSERT_OR_FAIL(s.availableApprox() == 0);
s.signal();
ASSERT_OR_FAIL(s.availableApprox() == 1);
s.signal();
ASSERT_OR_FAIL(s.availableApprox() == 2);
s.signal(10);
ASSERT_OR_FAIL(s.availableApprox() == 12);
s.signal(10);
ASSERT_OR_FAIL(s.availableApprox() == 22);
ASSERT_OR_FAIL(s.wait());
ASSERT_OR_FAIL(s.availableApprox() == 21);
ASSERT_OR_FAIL(s.wait());
ASSERT_OR_FAIL(s.availableApprox() == 20);
ASSERT_OR_FAIL(s.waitMany(10) == 10);
ASSERT_OR_FAIL(s.availableApprox() == 10);
ASSERT_OR_FAIL(s.waitMany(11) == 10);
ASSERT_OR_FAIL(s.availableApprox() == 0);
return true;
}
bool try_acquire_and_signal()
{
LightweightSemaphore s;
ASSERT_OR_FAIL(s.availableApprox() == 0);
s.signal();
ASSERT_OR_FAIL(s.availableApprox() == 1);
ASSERT_OR_FAIL(s.tryWaitMany(2) == 1);
ASSERT_OR_FAIL(s.availableApprox() == 0);
s.signal();
ASSERT_OR_FAIL(s.availableApprox() == 1);
ASSERT_OR_FAIL(s.tryWaitMany(3) == 1);
ASSERT_OR_FAIL(s.availableApprox() == 0);
s.signal(10);
ASSERT_OR_FAIL(s.availableApprox() == 10);
ASSERT_OR_FAIL(s.tryWaitMany(100) == 10);
ASSERT_OR_FAIL(s.availableApprox() == 0);
s.signal(10);
ASSERT_OR_FAIL(s.availableApprox() == 10);
ASSERT_OR_FAIL(s.tryWaitMany(5) == 5);
ASSERT_OR_FAIL(s.availableApprox() == 5);
ASSERT_OR_FAIL(s.tryWait());
ASSERT_OR_FAIL(s.availableApprox() == 4);
ASSERT_OR_FAIL(s.tryWait());
ASSERT_OR_FAIL(s.availableApprox() == 3);
return true;
}
struct TestListItem : corealgos::ListItem
{
int value;
TestListItem()
: value(0)
{
ctorCount().fetch_add(1, std::memory_order_relaxed);
}
explicit TestListItem(int value)
: value(value)
{
ctorCount().fetch_add(1, std::memory_order_relaxed);
}
~TestListItem()
{
dtorCount().fetch_add(1, std::memory_order_relaxed);
}
inline TestListItem* prev(std::memory_order order = std::memory_order_relaxed) const
{
return static_cast<TestListItem*>(concurrentListPrev.load(order));
}
inline static void reset()
{
ctorCount().store(0, std::memory_order_relaxed);
dtorCount().store(0, std::memory_order_relaxed);
}
inline static size_t constructed() { return ctorCount().load(std::memory_order_relaxed); }
inline static size_t destructed() { return dtorCount().load(std::memory_order_relaxed); }
private:
inline static std::atomic<size_t>& ctorCount() { static std::atomic<size_t> count(0); return count; }
inline static std::atomic<size_t>& dtorCount() { static std::atomic<size_t> count(0); return count; }
};
bool core_add_only_list()
{
auto destroyList = [](corealgos::ConcurrentAddOnlyList<TestListItem>& list) {
size_t count = 0;
auto tail = list.tail();
while (tail != nullptr) {
auto next = tail->prev();
delete tail;
++count;
tail = next;
}
return count;
};
{
corealgos::ConcurrentAddOnlyList<TestListItem> list;
ASSERT_OR_FAIL(list.tail() == nullptr);
ASSERT_OR_FAIL(destroyList(list) == 0);
}
{
corealgos::ConcurrentAddOnlyList<TestListItem> list;
for (int i = 0; i != 1000; ++i) {
list.add(new TestListItem(i));
}
int i = 999;
for (auto tail = list.tail(); tail != nullptr; tail = tail->prev()) {
ASSERT_OR_FAIL(i == tail->value);
--i;
}
ASSERT_OR_FAIL(i == -1);
ASSERT_OR_FAIL(destroyList(list) == 1000);
}
for (int repeats = 0; repeats != 10; ++repeats) {
corealgos::ConcurrentAddOnlyList<TestListItem> list;
std::vector<SimpleThread> threads(8);
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
for (int i = 0; i != 1000; ++i) {
list.add(new TestListItem((int)((tid << 16) | i)));
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
}
std::vector<int> prevItems(threads.size());
for (size_t i = 0; i != prevItems.size(); ++i) {
prevItems[i] = 1000;
}
for (auto tail = list.tail(); tail != nullptr; tail = tail->prev()) {
auto tid = tail->value >> 16;
auto i = tail->value & ((1 << 16) - 1);
ASSERT_OR_FAIL(prevItems[tid] == i + 1);
prevItems[tid] = i;
}
ASSERT_OR_FAIL(destroyList(list) == 1000 * threads.size());
}
return true;
}
bool core_thread_local()
{
TestListItem::reset();
{
corealgos::ThreadLocal<TestListItem> local(4);
}
ASSERT_OR_FAIL(TestListItem::constructed() == 0);
ASSERT_OR_FAIL(TestListItem::destructed() == 0);
TestListItem::reset();
{
corealgos::ThreadLocal<TestListItem> local(4);
local.get_or_create();
}
ASSERT_OR_FAIL(TestListItem::constructed() == 1);
ASSERT_OR_FAIL(TestListItem::destructed() == 1);
TestListItem::reset();
{
corealgos::ThreadLocal<TestListItem> local(4);
auto item = local.get_or_create();
item->value = 7;
item = local.get_or_create();
ASSERT_OR_FAIL(item->value == 7);
}
ASSERT_OR_FAIL(TestListItem::constructed() == 1);
ASSERT_OR_FAIL(TestListItem::destructed() == 1);
for (size_t initialSize = 1; initialSize <= 4; initialSize <<= 1) {
for (int reps = 0; reps != 20; ++reps) {
TestListItem::reset();
{
corealgos::ThreadLocal<TestListItem> local(initialSize);
std::vector<SimpleThread> threads(5 * initialSize);
std::vector<bool> failed(threads.size());
std::atomic<std::size_t> done(0);
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
failed[tid] = false;
auto item = local.get_or_create();
item->value = (int)tid;
for (int i = 0; i != 1024; ++i) {
item = local.get_or_create();
if (item->value != (int)tid) {
failed[tid] = true;
}
}
done.fetch_add(1, std::memory_order_seq_cst);
while (done.load(std::memory_order_relaxed) != threads.size()) {
moodycamel::sleep(1);
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
ASSERT_OR_FAIL(!failed[tid]);
}
ASSERT_OR_FAIL(TestListItem::constructed() == 5 * initialSize);
}
ASSERT_OR_FAIL(TestListItem::destructed() == 5 * initialSize);
}
}
return true;
}
struct TestNode : corealgos::FreeListNode<TestNode>
{
int value;
TestNode() { }
explicit TestNode(int value) : value(value) { }
};
bool core_free_list()
{
{
// Basic
corealgos::FreeList<TestNode> freeList;
ASSERT_OR_FAIL(freeList.try_get() == nullptr);
freeList.add(new TestNode(7));
TestNode* node = freeList.try_get();
ASSERT_OR_FAIL(node != nullptr);
ASSERT_OR_FAIL(node->value == 7);
ASSERT_OR_FAIL(freeList.try_get() == nullptr);
freeList.add(node);
node = freeList.try_get();
ASSERT_OR_FAIL(node != nullptr);
ASSERT_OR_FAIL(node->value == 7);
ASSERT_OR_FAIL(freeList.try_get() == nullptr);
delete node;
}
{
// Multi-threaded. Tests ABA too.
for (int rep = 0; rep != 10; ++rep) {
corealgos::FreeList<TestNode> freeList;
std::vector<SimpleThread> threads(rep < 8 ? 4 : 16);
std::vector<bool> failed(threads.size());
std::vector<TestNode> initialNodes(threads.size());
const int OP_COUNT = 2048;
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
std::vector<bool> seenValues(threads.size() * OP_COUNT, false);
failed[tid] = false;
TestNode* node = &initialNodes[tid];
node->value = ((int)tid << 20) | 1;
freeList.add(node);
for (int i = 1; i != OP_COUNT - 1; ++i) {
node = freeList.try_get();
if (node != nullptr) {
auto seen = seenValues.begin() + ((node->value >> 20) * OP_COUNT + (node->value & 0xFFFFF));
if (*seen) {
failed[tid] = true;
}
*seen = true;
node->value = ((int)tid << 20) | (i + 1);
freeList.add(node);
}
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
ASSERT_OR_FAIL(!failed[tid]);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
auto node = freeList.try_get();
ASSERT_OR_FAIL(node != nullptr);
ASSERT_OR_FAIL(node->value != -1);
node->value = -1;
}
auto node = freeList.try_get();
ASSERT_OR_FAIL(node == nullptr);
}
}
return true;
}
bool core_spmc_hash()
{
{
for (int rep = 0; rep != 20; ++rep) {
corealgos::SPMCSequentialHashMap<int> hash(rep < 10 ? 2 : 4);
std::vector<SimpleThread> threads(rep < 12 ? 4 : 16);
std::vector<bool> failed(threads.size());
const int MAX_ENTRIES = 4096;
std::vector<int> values(MAX_ENTRIES);
std::array<std::atomic<int>, MAX_ENTRIES> useCounts;
std::array<std::atomic<bool>, MAX_ENTRIES> removed;
for (std::size_t i = 0; i != useCounts.size(); ++i) {
useCounts[i].store(0, std::memory_order_relaxed);
removed[i].store(false, std::memory_order_relaxed);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
failed[tid] = false;
if (tid == 0) {
// Producer thread
for (int i = 0; i != MAX_ENTRIES; ++i) {
values[i] = i;
hash.insert(i, &values[i]);
useCounts[i].store((int)threads.size() / 2, std::memory_order_release);
}
}
else {
// One of the consumer threads
for (int i = MAX_ENTRIES * 2; i != 0; --i) { // Purposefully off-by-lots
int useCount = -1;
if (i < MAX_ENTRIES) {
useCount = useCounts[i].fetch_add(-1, std::memory_order_acquire);
}
int* val;
if (useCount > 0) {
val = hash.find(i);
bool isRemoved = removed[i].load(std::memory_order_relaxed);
assert(val == nullptr || *val == *val); // Find segfaults
// We read the use count again; if it's still > 0, the item must have been in
// the hash during the entire call to find(), so we can check its value
auto currentUseCount = useCounts[i].fetch_add(0, std::memory_order_release);
if ((currentUseCount > 0 || (currentUseCount == 0 && useCount == 1)) && (val == nullptr || *val != i || isRemoved)) {
failed[tid] = true;
}
}
if (useCount == 1) {
val = hash.remove(i);
if (val == nullptr || *val != i || removed[i].load(std::memory_order_relaxed)) {
failed[tid] = true;
}
removed[i].store(true, std::memory_order_release);
}
}
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
ASSERT_OR_FAIL(!failed[tid]);
}
for (int i = 0; i != MAX_ENTRIES; ++i) {
auto val = hash.find(i);
if (val != nullptr) {
ASSERT_OR_FAIL(&values[i] == val && *val == i && !removed[i].load(std::memory_order_relaxed));
}
else {
ASSERT_OR_FAIL(removed[i].load(std::memory_order_relaxed));
}
auto removedVal = hash.remove(i);
ASSERT_OR_FAIL(removedVal == val);
}
for (int i = 0; i != MAX_ENTRIES; ++i) {
ASSERT_OR_FAIL(hash.find(i) == nullptr);
ASSERT_OR_FAIL(hash.remove(i) == nullptr);
}
ASSERT_OR_FAIL(hash.find(MAX_ENTRIES) == nullptr);
ASSERT_OR_FAIL(hash.remove(MAX_ENTRIES) == nullptr);
}
}
return true;
}
bool explicit_strings_threaded()
{
std::vector<SimpleThread> threads(8);
ConcurrentQueue<std::string, MallocTrackingTraits> q(1024 * 1024);
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
const size_t ITERATIONS = 100 * 1024;
if (tid % 2 == 0) {
// Produce
ProducerToken t(q);
for (size_t i = 0; i != ITERATIONS; ++i) {
q.enqueue(t, std::string("banana", i % 6));
}
}
else {
// Consume
std::string item;
for (size_t i = 0; i != ITERATIONS / 2; ++i) {
q.try_dequeue(item);
}
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
}
return true;
}
bool large_traits()
{
union Elem { uint32_t x; char dummy[156]; };
ConcurrentQueue<Elem, LargeTraits> q(10000, 0, 48);
std::vector<SimpleThread> threads(48);
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = SimpleThread([&](size_t tid) {
const size_t ELEMENTS = 5000;
if (tid != 0) {
// Produce
for (uint32_t i = 0; i != ELEMENTS; ++i)
q.try_enqueue(Elem { ((uint32_t)tid << 16) | i });
}
else {
// Consume
Elem item[256];
for (size_t i = 0; i != ELEMENTS * 200; ++i)
q.try_dequeue_bulk(item, sizeof(item) / sizeof(item[0]));
}
}, tid);
}
for (size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
}
return true;
}
};
}
void printTests(ConcurrentQueueTests const& tests)
{
std::printf(" Supported tests are:\n");
std::vector<std::string> names;
tests.getAllTestNames(names);
for (auto it = names.cbegin(); it != names.cend(); ++it) {
std::printf(" %s\n", it->c_str());
}
}
// Basic test harness
#if !defined(TARGET_OS_IPHONE)
int main(int argc, char** argv)
{
bool disablePrompt = false;
unsigned int iterations = 8;
std::vector<std::string> selectedTests;
// Disable buffering (so that when run in, e.g., Sublime Text, the output appears as it is written)
std::setvbuf(stdout, nullptr, _IONBF, 0);
// Isolate the executable name
std::string progName = argv[0];
auto slash = progName.find_last_of("/\\");
if (slash != std::string::npos) {
progName = progName.substr(slash + 1);
}
ConcurrentQueueTests tests;
// Parse command line options
if (argc > 1) {
bool printHelp = false;
bool printedTests = false;
bool error = false;
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--help") == 0) {
printHelp = true;
}
else if (std::strcmp(argv[i], "--disable-prompt") == 0) {
disablePrompt = true;
}
else if (std::strcmp(argv[i], "--run") == 0) {
if (i + 1 == argc || argv[i + 1][0] == '-') {
std::printf("Expected test name argument for --run option.\n");
if (!printedTests) {
printTests(tests);
printedTests = true;
}
error = true;
continue;
}
if (!tests.validateTestName(argv[++i])) {
std::printf("Unrecognized test '%s'.\n", argv[i]);
if (!printedTests) {
printTests(tests);
printedTests = true;
}
error = true;
continue;
}
selectedTests.push_back(argv[i]);
}
else if (std::strcmp(argv[i], "--iterations") == 0) {
if (i + 1 == argc || argv[i + 1][0] == '-') {
std::printf("Expected iteration count argument for --iterations option.\n");
error = true;
continue;
}
iterations = static_cast<unsigned int>(std::atoi(argv[++i]));
}
else {
std::printf("Unrecognized option '%s'.\n", argv[i]);
error = true;
}
}
if (error || printHelp) {
if (error) {
std::printf("\n");
}
std::printf("%s\n Description: Runs unit tests for moodycamel::ConcurrentQueue\n", progName.c_str());
std::printf(" --help Prints this help blurb\n");
std::printf(" --run test Runs only the specified test(s)\n");
std::printf(" --iterations N Do N iterations of each test\n");
std::printf(" --disable-prompt Disables prompt before exit when the tests finish\n");
return error ? -1 : 0;
}
}
int exitCode = 0;
bool result;
if (selectedTests.size() > 0) {
std::printf("Running %d iteration%s of selected unit test%s for moodycamel::ConcurrentQueue.\n\n", iterations, iterations == 1 ? "" : "s", selectedTests.size() == 1 ? "" : "s");
result = tests.run(selectedTests, iterations);
}
else {
std::printf("Running %d iteration%s of all unit tests for moodycamel::ConcurrentQueue.\n(Run %s --help for other options.)\n\n", iterations, iterations == 1 ? "" : "s", progName.c_str());
result = tests.run(iterations);
}
if (result) {
std::printf("All %stests passed.\n", (selectedTests.size() > 0 ? "selected " : ""));
}
else {
std::printf("Test(s) failed!\n");
exitCode = 2;
}
if (!disablePrompt) {
std::printf("Press ENTER to exit.\n");
getchar();
}
return exitCode;
}
#else
// Provide entry function that can be invoked
// by a test host (iOS app / test runner)
bool runAllTests() {
unsigned int iterations = 8;
ConcurrentQueueTests tests;
return tests.run(iterations);
}
#endif // !defined(TARGET_OS_IPHONE)