hpx/concurrency/concurrentqueue.hpp

See Public API for a list of names and headers that are part of the public HPX API.

Defines

MOODYCAMEL_THREADLOCAL
MOODYCAMEL_EXCEPTIONS_ENABLED
MOODYCAMEL_TRY
MOODYCAMEL_CATCH(...)
MOODYCAMEL_RETHROW
MOODYCAMEL_THROW(expr)
MOODYCAMEL_NOEXCEPT
MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
MOODYCAMEL_DELETE_FUNCTION
namespace hpx
namespace concurrency

Functions

template<typename T, typename Traits>
void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b)
template<typename T, typename Traits>
void swap(ConcurrentQueue<T, Traits> &a, ConcurrentQueue<T, Traits> &b)
void swap(ProducerToken &a, ProducerToken &b)
void swap(ConsumerToken &a, ConsumerToken &b)
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue

Public Types

typedef ::hpx::concurrency::ProducerToken producer_token_t
typedef ::hpx::concurrency::ConsumerToken consumer_token_t
typedef Traits::index_t index_t
typedef Traits::size_t size_t

Public Functions

ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
~ConcurrentQueue()
ConcurrentQueue(ConcurrentQueue const&)
ConcurrentQueue &operator=(ConcurrentQueue const&)
ConcurrentQueue(ConcurrentQueue &&other)
ConcurrentQueue &operator=(ConcurrentQueue &&other)
void swap(ConcurrentQueue &other)
bool enqueue(T const &item)
bool enqueue(T &&item)
bool enqueue(producer_token_t const &token, T const &item)
bool enqueue(producer_token_t const &token, T &&item)
template<typename It>
bool enqueue_bulk(It itemFirst, size_t count)
template<typename It>
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
bool try_enqueue(T const &item)
bool try_enqueue(T &&item)
bool try_enqueue(producer_token_t const &token, T const &item)
bool try_enqueue(producer_token_t const &token, T &&item)
template<typename It>
bool try_enqueue_bulk(It itemFirst, size_t count)
template<typename It>
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
template<typename U>
bool try_dequeue(U &item)
template<typename U>
bool try_dequeue_non_interleaved(U &item)
template<typename U>
bool try_dequeue(consumer_token_t &token, U &item)
template<typename It>
size_t try_dequeue_bulk(It itemFirst, size_t max)
template<typename It>
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
template<typename U>
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
template<typename It>
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
size_t size_approx() const

Public Static Functions

static bool is_lock_free()

Public Static Attributes

const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE)
const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE)
const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE)
const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE)
const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE)
const size_t hpx::concurrency::ConcurrentQueue::MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE)

Private Types

enum AllocationMode

Values:

CanAlloc
CannotAlloc
enum InnerQueueContext

Values:

implicit_context = 0
explicit_context = 1

Private Functions

ConcurrentQueue &swap_internal(ConcurrentQueue &other)
template<AllocationMode canAlloc, typename U>
bool inner_enqueue(producer_token_t const &token, U &&element)
template<AllocationMode canAlloc, typename U>
bool inner_enqueue(U &&element)
template<AllocationMode canAlloc, typename It>
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
template<AllocationMode canAlloc, typename It>
bool inner_enqueue_bulk(It itemFirst, size_t count)
bool update_current_producer_after_rotation(consumer_token_t &token)
void populate_initial_block_list(size_t blockCount)
Block *try_get_block_from_initial_pool()
void add_block_to_free_list(Block *block)
void add_blocks_to_free_list(Block *block)
Block *try_get_block_from_free_list()
template<AllocationMode canAlloc>
Block *requisition_block()
ProducerBase *recycle_or_create_producer(bool isExplicit)
ProducerBase *recycle_or_create_producer(bool isExplicit, bool &recycled)
ProducerBase *add_producer(ProducerBase *producer)
void reown_producers()
void populate_initial_implicit_producer_hash()
void swap_implicit_producer_hashes(ConcurrentQueue &other)
ImplicitProducer *get_or_add_implicit_producer()

Private Members

std::atomic<ProducerBase*> producerListTail
std::atomic<std::uint32_t> producerCount
std::atomic<size_t> initialBlockPoolIndex
Block *initialBlockPool
size_t initialBlockPoolSize
FreeList<Block> freeList
std::atomic<ImplicitProducerHash*> implicitProducerHash
std::atomic<size_t> implicitProducerHashCount
ImplicitProducerHash initialImplicitProducerHash
std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries
std::atomic_flag implicitProducerHashResizeInProgress
std::atomic<std::uint32_t> nextExplicitConsumerId
std::atomic<std::uint32_t> globalExplicitConsumerOffset

Private Static Functions

template<typename U>
static U *create_array(size_t count)
template<typename U>
static void destroy_array(U *p, size_t count)
template<typename U>
static U *create()
template<typename U, typename A1>
static U *create(A1 &&a1)
template<typename U>
static void destroy(U *p)

Friends

friend hpx::concurrency::ProducerToken
friend hpx::concurrency::ConsumerToken
friend hpx::concurrency::ExplicitProducer
friend hpx::concurrency::ImplicitProducer
friend hpx::concurrency::ConcurrentQueueTests
template<typename XT, typename XTraits>
void swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&)
struct Block

Public Functions

template<>
Block()
template<InnerQueueContext context>
bool is_empty() const
template<InnerQueueContext context>
bool set_empty(index_t i)
template<InnerQueueContext context>
bool set_many_empty(index_t i, size_t count)
template<InnerQueueContext context>
void set_all_empty()
template<InnerQueueContext context>
void reset_empty()
template<>
T *operator[](index_t idx)
template<>
T const *operator[](index_t idx) const

Public Members

template<>
char elements[sizeof(T) * BLOCK_SIZE]
template<>
details::max_align_t dummy
template<>
Block *next
template<>
std::atomic<size_t> elementsCompletelyDequeued
std::atomic<bool> hpx::concurrency::ConcurrentQueue< T, Traits >::Block::emptyFlags[BLOCK_SIZE<=EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE :1]
template<>
std::atomic<std::uint32_t> freeListRefs
template<>
std::atomic<Block*> freeListNext
template<>
std::atomic<bool> shouldBeOnFreeList
template<>
bool dynamicallyAllocated

Private Members

template<>
union hpx::concurrency::ConcurrentQueue::Block::[anonymous] [anonymous]
struct ExplicitProducer : public hpx::concurrency::ConcurrentQueue<T, Traits>::ProducerBase

Public Functions

template<>
ExplicitProducer(ConcurrentQueue *parent)
template<>
~ExplicitProducer()
template<AllocationMode allocMode, typename U>
bool enqueue(U &&element)
template<typename U>
bool dequeue(U &element)
template<AllocationMode allocMode, typename It>
bool enqueue_bulk(It itemFirst, size_t count)
template<typename It>
size_t dequeue_bulk(It &itemFirst, size_t max)

Private Functions

template<>
bool new_block_index(size_t numberOfFilledSlotsToExpose)

Private Members

template<>
std::atomic<BlockIndexHeader*> blockIndex
template<>
size_t pr_blockIndexSlotsUsed
template<>
size_t pr_blockIndexSize
template<>
size_t pr_blockIndexFront
template<>
BlockIndexEntry *pr_blockIndexEntries
template<>
void *pr_blockIndexRaw
struct BlockIndexEntry

Public Members

template<>
index_t base
template<>
Block *block
struct BlockIndexHeader

Public Members

template<>
size_t size
template<>
std::atomic<size_t> front
template<>
BlockIndexEntry *entries
template<>
void *prev
template<typename N>
struct FreeList

Public Functions

template<>
FreeList()
template<>
FreeList(FreeList &&other)
template<>
void swap(FreeList &other)
template<>
FreeList(FreeList const&)
template<>
FreeList &operator=(FreeList const&)
template<>
void add(N *node)
template<>
N *try_get()
template<>
N *head_unsafe() const

Private Functions

template<>
void add_knowing_refcount_is_zero(N *node)

Private Members

template<>
std::atomic<N*> freeListHead

Private Static Attributes

template<>
const std::uint32_t REFS_MASK = 0x7FFFFFFF
template<>
const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000
template<typename N>
struct FreeListNode

Public Functions

template<>
FreeListNode()

Public Members

template<>
std::atomic<std::uint32_t> freeListRefs
template<>
std::atomic<N*> freeListNext
struct ImplicitProducer : public hpx::concurrency::ConcurrentQueue<T, Traits>::ProducerBase

Public Functions

template<>
ImplicitProducer(ConcurrentQueue *parent)
template<>
~ImplicitProducer()
template<AllocationMode allocMode, typename U>
bool enqueue(U &&element)
template<typename U>
bool dequeue(U &element)
template<AllocationMode allocMode, typename It>
bool enqueue_bulk(It itemFirst, size_t count)
template<typename It>
size_t dequeue_bulk(It &itemFirst, size_t max)

Private Functions

template<AllocationMode allocMode>
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
template<>
void rewind_block_index_tail()
template<>
BlockIndexEntry *get_block_index_entry_for_index(index_t index) const
template<>
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
template<>
bool new_block_index()

Private Members

template<>
size_t nextBlockIndexCapacity
template<>
std::atomic<BlockIndexHeader*> blockIndex

Private Static Attributes

template<>
const index_t INVALID_BLOCK_BASE = 1
struct BlockIndexEntry

Public Members

template<>
std::atomic<index_t> key
template<>
std::atomic<Block*> value
struct BlockIndexHeader

Public Members

template<>
size_t capacity
template<>
std::atomic<size_t> tail
template<>
BlockIndexEntry *entries
template<>
BlockIndexEntry **index
template<>
BlockIndexHeader *prev
struct ImplicitProducerHash

Public Members

template<>
size_t capacity
template<>
ImplicitProducerKVP *entries
template<>
ImplicitProducerHash *prev
struct ImplicitProducerKVP

Public Functions

template<>
ImplicitProducerKVP()
template<>
ImplicitProducerKVP(ImplicitProducerKVP &&other)
template<>
ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other)
template<>
void swap(ImplicitProducerKVP &other)

Public Members

template<>
std::atomic<details::thread_id_t> key
template<>
ImplicitProducer *value
struct ProducerBase : public hpx::concurrency::details::ConcurrentQueueProducerTypelessBase

Public Functions

template<>
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
template<>
virtual ~ProducerBase()
template<typename U>
bool dequeue(U &element)
template<typename It>
size_t dequeue_bulk(It &itemFirst, size_t max)
template<>
ProducerBase *next_prod() const
template<>
size_t size_approx() const
template<>
index_t getTail() const

Public Members

template<>
bool isExplicit
template<>
ConcurrentQueue *parent

Protected Attributes

template<>
std::atomic<index_t> tailIndex
template<>
std::atomic<index_t> headIndex
template<>
std::atomic<index_t> dequeueOptimisticCount
template<>
std::atomic<index_t> dequeueOvercommit
template<>
Block *tailBlock
struct ConcurrentQueueDefaultTraits

Public Types

typedef std::size_t size_t
typedef std::size_t index_t

Public Static Functions

static void *malloc(size_t size)
static void free(void *ptr)

Public Static Attributes

const size_t BLOCK_SIZE = 32
const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32
const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32
const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32
const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32
const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256
const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value
struct ConsumerToken

Public Functions

template<typename T, typename Traits>
ConsumerToken(ConcurrentQueue<T, Traits> &q)
template<typename T, typename Traits>
ConsumerToken(BlockingConcurrentQueue<T, Traits> &q)
ConsumerToken(ConsumerToken &&other)
ConsumerToken &operator=(ConsumerToken &&other)
void swap(ConsumerToken &other)
ConsumerToken(ConsumerToken const&)
ConsumerToken &operator=(ConsumerToken const&)

Private Members

std::uint32_t initialOffset
std::uint32_t lastKnownGlobalOffset
std::uint32_t itemsConsumedFromCurrent
details::ConcurrentQueueProducerTypelessBase *currentProducer
details::ConcurrentQueueProducerTypelessBase *desiredProducer

Friends

friend hpx::concurrency::ConcurrentQueue
friend hpx::concurrency::ConcurrentQueueTests
struct ProducerToken

Public Functions

template<typename T, typename Traits>
ProducerToken(ConcurrentQueue<T, Traits> &queue)
template<typename T, typename Traits>
ProducerToken(BlockingConcurrentQueue<T, Traits> &queue)
ProducerToken(ProducerToken &&other)
ProducerToken &operator=(ProducerToken &&other)
void swap(ProducerToken &other)
bool valid() const
~ProducerToken()
ProducerToken(ProducerToken const&)
ProducerToken &operator=(ProducerToken const&)

Protected Attributes

details::ConcurrentQueueProducerTypelessBase *producer

Friends

friend hpx::concurrency::ConcurrentQueue
friend hpx::concurrency::ConcurrentQueueTests
namespace details

Typedefs

typedef std::uintptr_t thread_id_t
typedef std::max_align_t std_max_align_t

Functions

static thread_id_t thread_id()
static bool() hpx::concurrency::details::likely(bool x)
static bool() hpx::concurrency::details::unlikely(bool x)
static size_t hash_thread_id(thread_id_t id)
template<typename T>
static bool circular_less_than(T a, T b)
template<typename U>
static char *align_for(char *ptr)
template<typename T>
static T ceil_to_pow_2(T x)
template<typename T>
static void swap_relaxed(std::atomic<T> &left, std::atomic<T> &right)
template<typename T>
static T const &nomove(T const &x)
template<typename It>
static auto deref_noexcept(It &it)

Variables

const thread_id_t invalid_thread_id = 0
const thread_id_t invalid_thread_id2 = 1
template<bool use32>
struct _hash_32_or_64

Public Static Functions

static std::uint32_t hash(std::uint32_t h)
template<>
struct _hash_32_or_64<1>

Public Static Functions

static std::uint64_t hash(std::uint64_t h)
struct ConcurrentQueueProducerTypelessBase

Public Functions

ConcurrentQueueProducerTypelessBase()

Public Members

ConcurrentQueueProducerTypelessBase *next
std::atomic<bool> inactive
ProducerToken *token
template<typename T>
struct const_numeric_max

Public Static Attributes

const T hpx::concurrency::details::const_numeric_max::value= std::numeric_limits<T>::is_signed            ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)            : static_cast<T>(-1)
union max_align_t

Public Members

std_max_align_t x
long long y
void *z
template<bool Enable>
struct nomove_if

Public Static Functions

template<typename T>
static T const &eval(T const &x)
template<>
struct nomove_if<false>

Public Static Functions

template<typename U>
static auto eval(U &&x)
template<>
struct static_is_lock_free<bool>

Public Types

enum [anonymous]

Values:

value = ATOMIC_BOOL_LOCK_FREE
template<typename U>
struct static_is_lock_free<U*>

Public Types

enum [anonymous]

Values:

value = ATOMIC_POINTER_LOCK_FREE
template<typename T>
struct static_is_lock_free_num

Public Types

enum [anonymous]

Values:

value = 0
template<>
struct static_is_lock_free_num<int>

Public Types

enum [anonymous]

Values:

value = ATOMIC_INT_LOCK_FREE
template<>
struct static_is_lock_free_num<long>

Public Types

enum [anonymous]

Values:

value = ATOMIC_LONG_LOCK_FREE
template<>
struct static_is_lock_free_num<long long>

Public Types

enum [anonymous]

Values:

value = ATOMIC_LLONG_LOCK_FREE
template<>
struct static_is_lock_free_num<short>

Public Types

enum [anonymous]

Values:

value = ATOMIC_SHORT_LOCK_FREE
template<>
struct static_is_lock_free_num<signed char>

Public Types

enum [anonymous]

Values:

value = ATOMIC_CHAR_LOCK_FREE
template<typename thread_id_t>
struct thread_id_converter

Public Types

typedef thread_id_t thread_id_numeric_size_t
typedef thread_id_t thread_id_hash_t

Public Static Functions

static thread_id_hash_t prehash(thread_id_t const &x)