You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
concurrentqueue/tests/corealgos.h

670 lines
24 KiB
C++

// ©2014 Cameron Desrochers
// moodycamel::ConcurrentQueue contains many inner data structures which
// are difficult to test in isolation. So, this file contains copies of
// them, extracted and isolated so as to be independently testable.
#pragma once
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <algorithm>
#include <utility>
#include <limits>
#include <cassert>
// Define corealgos_allocator before including this header in order to override the
// default malloc/free functions
#ifndef corealgos_allocator
struct corealgos_allocator
{
static inline void* malloc(std::size_t size) { return std::malloc(size); }
static inline void free(void* ptr) { std::free(ptr); }
};
#endif
////////////////////////////////////////////////////////////////////////////////
// Lock-free add-only list (e.g. used to track producers)
////////////////////////////////////////////////////////////////////////////////
namespace moodycamel { namespace corealgos {
struct ListItem
{
ListItem()
: concurrentListPrev(nullptr)
{
}
public:
std::atomic<ListItem*> concurrentListPrev;
};
template<typename T> // T should inherit ListItem or implement the same interface
struct ConcurrentAddOnlyList
{
ConcurrentAddOnlyList()
: tail_(nullptr)
{
}
inline T* tail() { return tail_.load(std::memory_order_acquire); }
void add(T* element)
{
assert(element != nullptr);
// Add it to the lock-free list
auto prevTail = tail_.load(std::memory_order_relaxed);
do {
element->concurrentListPrev = prevTail;
} while (!tail_.compare_exchange_weak(prevTail, element, std::memory_order_release, std::memory_order_relaxed));
}
private:
std::atomic<T*> tail_;
};
} }
////////////////////////////////////////////////////////////////////////////////
// Thread local hash map
////////////////////////////////////////////////////////////////////////////////
#if defined(__APPLE__)
#include "TargetConditionals.h" // Needed for TARGET_OS_IPHONE
#endif
// Platform-specific definitions of a numeric thread ID type and an invalid value
#if defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
// No sense pulling in windows.h in a header, we'll manually declare the function
// we use and rely on backwards-compatibility for this not to break
extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
namespace moodycamel { namespace corealgos { namespace details {
static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
typedef std::uint32_t thread_id_t;
static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
} } }
#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
namespace moodycamel { namespace corealgos { namespace details {
typedef std::uintptr_t thread_id_t;
static const thread_id_t invalid_thread_id = 0;
static inline thread_id_t thread_id() { return std::hash<std::thread::id>()(std::this_thread::get_id()); }
} } }
#else
// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
// static variable's address as a thread identifier :-)
#if defined(__GNUC__) || defined(__INTEL_COMPILER)
#define MOODYCAMEL_COREALGO_THREADLOCAL __thread
#elif defined(_MSC_VER)
#define MOODYCAMEL_COREALGO_THREADLOCAL __declspec(thread)
#else
// Assume C++11 compliant compiler
#define MOODYCAMEL_COREALGO_THREADLOCAL thread_local
#endif
namespace moodycamel { namespace corealgos { namespace details {
typedef std::uintptr_t thread_id_t;
static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
static inline thread_id_t thread_id() { static MOODYCAMEL_COREALGO_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
} } }
#endif
namespace moodycamel { namespace corealgos {
namespace details
{
template<bool use32> struct _hash_32_or_64 {
static inline std::size_t hash(std::uint32_t h)
{
// MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
// Since the thread ID is already unique, all we really want to do is propagate that
// uniqueness evenly across all the bits, so that we can use a subset of the bits while
// reducing collisions significantly
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
return static_cast<std::size_t>(h ^ (h >> 16));
}
};
template<> struct _hash_32_or_64<1> {
static inline std::size_t hash(std::uint64_t h)
{
h ^= h >> 33;
h *= 0xff51afd7ed558ccd;
h ^= h >> 33;
h *= 0xc4ceb9fe1a85ec53;
return static_cast<std::size_t>(h ^ (h >> 33));
}
};
template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
static inline std::size_t hash_thread_id(thread_id_t id)
{
static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
return hash_32_or_64<sizeof(thread_id_t)>::hash(id);
}
template<typename U>
static inline char* align_for(char* ptr)
{
const std::size_t alignment = std::alignment_of<U>::value;
return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
}
}
template<typename T> // T should inherit ListItem or implement the same interface
struct ThreadLocal
{
explicit ThreadLocal(std::size_t initialHashSize)
: initialHashEntries(initialHashSize)
{
assert(initialHashSize > 0 && (initialHashSize & (initialHashSize - 1)) == 0);
resizeInProgress.clear();
currentHashCount.store(0, std::memory_order_relaxed);
auto hash = &initialHash;
hash->capacity = initialHashSize;
hash->entries = &initialHashEntries[0];
for (std::size_t i = 0; i != initialHashSize; ++i) {
initialHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
}
hash->prev = nullptr;
currentHash.store(hash, std::memory_order_relaxed);
}
~ThreadLocal()
{
// Destroy items
auto ptr = items.tail();
while (ptr != nullptr) {
auto prev = static_cast<T*>(ptr->concurrentListPrev.load(std::memory_order_relaxed));
ptr->~T();
corealgos_allocator::free(ptr);
ptr = prev;
}
// Destroy hash tables
auto hash = currentHash.load(std::memory_order_relaxed);
while (hash != nullptr) {
auto prev = hash->prev;
if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
for (std::size_t i = 0; i != hash->capacity; ++i) {
hash->entries[i].~KeyValuePair();
}
hash->~InnerHash();
corealgos_allocator::free(hash);
}
hash = prev;
}
}
// Only fails (returns nullptr) if memory allocation fails
T* get_or_create()
{
// Note that since the data is essentially thread-local (key is thread ID),
// there's a reduced need for fences (memory ordering is already consistent
// for any individual thread), except for the current table itself
// Start by looking for the thread ID in the current and all previous hash tables.
// If it's not found, it must not be in there yet, since this same thread would
// have added it previously to one of the tables that we traversed.
// Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
auto id = details::thread_id();
auto hashedId = details::hash_thread_id(id);
auto mainHash = currentHash.load(std::memory_order_acquire);
for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
// Look for the id in this hash
auto index = hashedId;
while (true) { // Not an infinite loop because at least one slot is free in the hash table
index &= hash->capacity - 1;
auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
if (probedKey == id) {
// Found it! If we had to search several hashes deep, though, we should lazily add it
// to the current main hash table to avoid the extended search next time.
// Note there's guaranteed to be room in the current hash table since every subsequent
// table implicitly reserves space for all previous tables (there's only one
// currentHashCount).
auto value = hash->entries[index].value;
if (hash != mainHash) {
index = hashedId;
while (true) {
index &= mainHash->capacity - 1;
probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
auto expected = details::invalid_thread_id;
if (probedKey == expected && mainHash->entries[index].key.compare_exchange_strong(expected, id, std::memory_order_relaxed)) {
mainHash->entries[index].value = value;
break;
}
++index;
}
}
return value;
}
if (probedKey == details::invalid_thread_id) {
break; // Not in this hash table
}
++index;
}
}
// Insert!
auto newCount = 1 + currentHashCount.fetch_add(1, std::memory_order_relaxed);
while (true) {
if (newCount >= (mainHash->capacity >> 1) && !resizeInProgress.test_and_set(std::memory_order_acquire)) {
// We've acquired the resize lock, try to allocate a bigger hash table.
// Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
// we reload currentHash it must be the most recent version (it only gets changed within this
// locked block).
mainHash = currentHash.load(std::memory_order_acquire);
auto newCapacity = mainHash->capacity << 1;
while (newCount >= (newCapacity >> 1)) {
newCapacity <<= 1;
}
auto raw = static_cast<char*>(corealgos_allocator::malloc(sizeof(InnerHash) + std::alignment_of<KeyValuePair>::value - 1 + sizeof(KeyValuePair) * newCapacity));
if (raw == nullptr) {
// Allocation failed
currentHashCount.fetch_add((uint32_t)-1, std::memory_order_relaxed);
resizeInProgress.clear(std::memory_order_relaxed);
return nullptr;
}
auto newHash = new (raw) InnerHash;
newHash->capacity = newCapacity;
newHash->entries = reinterpret_cast<KeyValuePair*>(details::align_for<KeyValuePair>(raw + sizeof(InnerHash)));
for (std::size_t i = 0; i != newCapacity; ++i) {
new (newHash->entries + i) KeyValuePair;
newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
}
newHash->prev = mainHash;
currentHash.store(newHash, std::memory_order_release);
resizeInProgress.clear(std::memory_order_release);
mainHash = newHash;
}
// If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
// to finish being allocated by another thread (and if we just finished allocating above, the condition will
// always be true)
if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
auto element = (T*)corealgos_allocator::malloc(sizeof(T));
if (element == nullptr) {
return nullptr;
}
new (element) T();
items.add(element); // Track items so they can be destructed later
auto index = hashedId;
while (true) {
index &= mainHash->capacity - 1;
auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
auto expected = details::invalid_thread_id;
if (probedKey == expected && mainHash->entries[index].key.compare_exchange_strong(expected, id, std::memory_order_relaxed)) {
mainHash->entries[index].value = element;
break;
}
++index;
}
return element;
}
// Hmm, the old hash is quite full and somebody else is busy allocating a new one.
// We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
// we try to allocate ourselves).
mainHash = currentHash.load(std::memory_order_acquire);
}
}
private:
struct KeyValuePair
{
std::atomic<details::thread_id_t> key;
T* value; // No need for atomicity since it's only read by the thread that sets it in the first place
KeyValuePair()
{ }
KeyValuePair(KeyValuePair const& other)
: key(other.key.load()), value(other.value)
{ }
KeyValuePair& operator=(KeyValuePair const& other)
{
key.store(other.key.load());
value = other.value;
return *this;
}
};
struct InnerHash
{
std::size_t capacity;
KeyValuePair* entries;
InnerHash* prev;
};
std::atomic_flag resizeInProgress;
std::atomic<InnerHash*> currentHash;
std::atomic<std::size_t> currentHashCount; // Number of slots logically used
InnerHash initialHash;
std::vector<KeyValuePair> initialHashEntries;
ConcurrentAddOnlyList<T> items;
};
////////////////////////////////////////////////////////////////////////////////
// Lock-free free list
////////////////////////////////////////////////////////////////////////////////
template <typename N>
struct FreeListNode
{
FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
std::atomic<std::uint32_t> freeListRefs;
std::atomic<N*> freeListNext;
FreeListNode(FreeListNode const& other)
: freeListRefs(other.freeListRefs.load()), freeListNext(other.freeListNext.load())
{ }
FreeListNode& operator=(FreeListNode const& other)
{
freeListRefs.store(other.freeListRefs.load());
freeListNext.store(other.freeListNext.load());
return *this;
}
};
// A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention,
// but simple and correct (assuming nodes are never freed until after the free list is destroyed),
// and fairly speedy under low contention.
template<typename N> // N must inherit FreeListNode or have the same fields (and initialization)
struct FreeList
{
FreeList() : freeListHead(nullptr) { }
inline void add(N* node)
{
// We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
// set it using a fetch_add
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
// Oh look! We were the last ones referencing this node, and we know
// we want to add it to the free list, so let's do it!
add_knowing_refcount_is_zero(node);
}
}
inline N* try_get()
{
auto head = freeListHead.load(std::memory_order_acquire);
while (head != nullptr) {
auto prevHead = head;
auto refs = head->freeListRefs.load(std::memory_order_relaxed);
if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1,
std::memory_order_acquire, std::memory_order_relaxed)) {
head = freeListHead.load(std::memory_order_acquire);
continue;
}
// Good, reference count has been incremented (it wasn't at zero), which means
// we can read the next and not worry about it changing between now and the time
// we do the CAS
auto next = head->freeListNext.load(std::memory_order_relaxed);
if (freeListHead.compare_exchange_strong(head, next,
std::memory_order_acquire, std::memory_order_relaxed)) {
// Yay, got the node. This means it was on the list, which means
// shouldBeOnFreeList must be false no matter the refcount (because
// nobody else knows it's been taken off yet, it can't have been put back on).
assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
// Decrease refcount twice, once for our ref, and once for the list's ref
head->freeListRefs.fetch_add(-2u, std::memory_order_release);
return head;
}
// OK, the head must have changed on us, but we still need to decrease the refcount we
// increased.
// Note that we don't need to release any memory effects, but we do need to ensure that the reference
// count decrement happens-after the CAS on the head.
refs = prevHead->freeListRefs.fetch_add(-1u, std::memory_order_acq_rel);
if (refs == SHOULD_BE_ON_FREELIST + 1) {
add_knowing_refcount_is_zero(prevHead);
}
}
return nullptr;
}
// Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
private:
inline void add_knowing_refcount_is_zero(N* node)
{
// Since the refcount is zero, and nobody can increase it once it's zero (except us, and we
// run only one copy of this method per node at a time, i.e. the single thread case), then we
// know we can safely change the next pointer of the node; however, once the refcount is back
// above zero, then other threads could increase it (happens under heavy contention, when the
// refcount goes to zero in between a load and a refcount increment of a node in try_get, then
// back up to something non-zero, then the refcount increment is done by the other thread) --
// so, if the CAS to add the node to the actual list fails, decrease the refcount and leave
// the add operation to the next thread who puts the refcount back at zero (which could be us,
// hence the loop).
auto head = freeListHead.load(std::memory_order_relaxed);
while (true) {
node->freeListNext.store(head, std::memory_order_relaxed);
node->freeListRefs.store(1, std::memory_order_release);
if (!freeListHead.compare_exchange_strong(head, node,
std::memory_order_release, std::memory_order_relaxed)) {
// Hmm, the add failed, but we can only try again when the refcount goes back to zero
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
continue;
}
}
return;
}
}
private:
static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
// Implemented like a stack, but where node order doesn't matter (nodes are
// inserted out of order under contention)
std::atomic<N*> freeListHead;
};
////////////////////////////////////////////////////////////////////////////////
// Lock-free (single-producer, multi-consumer) numeric-key hash map of sorts;
// there are many conditions that must be met, i.e. items have to be inserted
// in increasing order by key (wrap-around is OK), and items cannot be searched
// for or removed unless they are known to be in the map in the first place.
////////////////////////////////////////////////////////////////////////////////
template<typename TValue>
struct SPMCSequentialHashMap
{
explicit SPMCSequentialHashMap(std::size_t initialSize)
: nextCapacity(initialSize), index(nullptr)
{
new_index();
}
~SPMCSequentialHashMap()
{
auto ptr = index.load(std::memory_order_relaxed);
if (ptr != nullptr) {
for (std::size_t i = 0; i != ptr->capacity; ++i) {
ptr->index[i]->~IndexEntry();
}
do {
auto prev = ptr->prev;
ptr->~IndexHeader();
corealgos_allocator::free(ptr);
ptr = prev;
} while (ptr != nullptr);
}
}
// Not thread safe. Only call from single producer thread.
// Note: key must *not* be in hash already, and must be exactly
// one larger than the previously inserted key value.
void insert(std::uint64_t key, TValue* value)
{
IndexEntry* idxEntry;
insert_index_entry(idxEntry, key);
idxEntry->value.store(value, std::memory_order_release);
}
// Thread-safe, but if somebody can remove the key while find() is
// in progress, then any returned value is not guaranteed to correspond
// to that key. This also applies if the key was not already present but
// once was. Elements can be found in any order.
TValue* find(std::uint64_t key)
{
auto idxEntry = get_entry_for_key(key);
if (idxEntry == nullptr)
return nullptr;
return idxEntry->value.load(std::memory_order_acquire);
}
// Thread-safe, but if somebody else can remove the same key while remove()
// is in progress, then any removed value is not guaranteed to correspond
// to that key This also applies if the key was not already present but
// once was. Elements can be removed in an order.
TValue* remove(std::uint64_t key)
{
auto idxEntry = get_entry_for_key(key);
if (idxEntry == nullptr)
return nullptr;
TValue* val = nullptr;
while (!idxEntry->value.compare_exchange_weak(val, nullptr, std::memory_order_acquire, std::memory_order_relaxed))
continue;
return val;
}
private:
struct IndexEntry
{
std::atomic<std::uint64_t> key;
std::atomic<TValue*> value;
};
struct IndexHeader
{
std::size_t capacity;
std::atomic<std::size_t> tail;
IndexEntry* entries;
IndexEntry** index;
IndexHeader* prev;
};
inline void insert_index_entry(IndexEntry*& idxEntry, std::uint64_t key)
{
auto localIndex = index.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
auto newTail = (localIndex->tail.load(std::memory_order_relaxed) + 1) & (localIndex->capacity - 1);
idxEntry = localIndex->index[newTail];
if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_KEY ||
idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
idxEntry->key.store(key, std::memory_order_relaxed);
localIndex->tail.store(newTail, std::memory_order_release);
return;
}
// No room in the old index, try to allocate another one!
new_index();
localIndex = index.load(std::memory_order_relaxed);
newTail = (localIndex->tail.load(std::memory_order_relaxed) + 1) & (localIndex->capacity - 1);
idxEntry = localIndex->index[newTail];
assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_KEY);
idxEntry->key.store(key, std::memory_order_relaxed);
localIndex->tail.store(newTail, std::memory_order_release);
}
inline IndexEntry* get_entry_for_key(std::uint64_t key) const
{
auto localIndex = index.load(std::memory_order_acquire);
auto tail = localIndex->tail.load(std::memory_order_acquire);
auto tailBase = localIndex->index[tail]->key.load(std::memory_order_relaxed);
if (tailBase == INVALID_KEY) {
return nullptr;
}
auto offset = static_cast<std::size_t>(key - tailBase);
std::size_t idx = (tail + offset) & (localIndex->capacity - 1);
auto entry = localIndex->index[idx];
return entry->key.load(std::memory_order_relaxed) == key ? entry : nullptr;
}
bool new_index()
{
auto prev = index.load(std::memory_order_relaxed);
std::size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
auto entryCount = prev == nullptr ? nextCapacity : prevCapacity;
auto raw = static_cast<char*>(corealgos_allocator::malloc(
sizeof(IndexHeader) +
std::alignment_of<IndexEntry>::value - 1 + sizeof(IndexEntry) * entryCount +
std::alignment_of<IndexEntry*>::value - 1 + sizeof(IndexEntry*) * nextCapacity));
if (raw == nullptr) {
return false;
}
auto header = new (raw) IndexHeader;
auto entries = reinterpret_cast<IndexEntry*>(details::align_for<IndexEntry>(raw + sizeof(IndexHeader)));
auto idx = reinterpret_cast<IndexEntry**>(details::align_for<IndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(IndexEntry) * entryCount));
if (prev != nullptr) {
auto prevTail = prev->tail.load(std::memory_order_relaxed);
auto prevPos = prevTail;
std::size_t i = 0;
do {
prevPos = (prevPos + 1) & (prev->capacity - 1);
idx[i++] = prev->index[prevPos];
} while (prevPos != prevTail);
assert(i == prevCapacity);
}
for (std::size_t i = 0; i != entryCount; ++i) {
new (entries + i) IndexEntry;
entries[i].key.store(INVALID_KEY, std::memory_order_relaxed);
entries[i].value.store(nullptr, std::memory_order_relaxed);
idx[prevCapacity + i] = entries + i;
}
header->prev = prev;
header->entries = entries;
header->index = idx;
header->capacity = nextCapacity;
header->tail.store((prevCapacity - 1) & (nextCapacity - 1), std::memory_order_relaxed);
index.store(header, std::memory_order_release);
nextCapacity <<= 1;
return true;
}
private:
std::size_t nextCapacity;
std::atomic<IndexHeader*> index;
static const std::uint64_t INVALID_KEY = ~(std::uint64_t)0;
};
} }