schedulers

The contents of this module can be included with the header hpx/modules/schedulers.hpp. These headers may be used by user-code but are not guaranteed stable (neither header location nor contents). You are using these at your own risk. If you wish to use non-public functionality from a module we strongly suggest only including the module header hpx/modules/schedulers.hpp, not the particular header in which the functionality you would like to use is defined. See Public API for a list of names that are part of the public HPX API.

namespace hpx
namespace threads
namespace policies

Typedefs

using default_local_priority_queue_scheduler_terminated_queue = lockfree_fifo
template<typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo, typename StagedQueuing = lockfree_fifo, typename TerminatedQueuing = default_local_priority_queue_scheduler_terminated_queue>
class local_priority_queue_scheduler : public scheduler_base
#include <local_priority_queue_scheduler.hpp>

The local_priority_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from. Additionally it maintains separate queues: several for high priority threads and one for low priority threads. High priority threads are executed by the first N OS threads before any other work is executed. Low priority threads are executed by the last OS thread whenever no other work is available.

Public Types

typedef std::false_type has_periodic_maintenance
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing> thread_queue_type
typedef init_parameter init_parameter_type

Public Functions

local_priority_queue_scheduler(init_parameter_type const &init, bool deferred_initialization = true)
~local_priority_queue_scheduler()
void abort_all_suspended_threads()
bool cleanup_terminated(bool delete_all)
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
void create_thread(thread_init_data &data, thread_id_ref_type *id, error_code &ec)
bool get_next_thread(std::size_t num_thread, bool running, threads::thread_id_ref_type &thrd, bool enable_stealing)

Return the next thread to be executed, return false if none is available

void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, thread_priority priority = thread_priority::normal)

Schedule the passed thread.

void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, thread_priority priority = thread_priority::normal)
void destroy_thread(threads::thread_data *thrd)

Destroy the passed thread as it has been terminated.

std::int64_t get_queue_length(std::size_t num_thread = std::size_t(-1)) const
std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t num_thread = std::size_t(-1), bool = false) const
bool is_core_idle(std::size_t num_thread) const
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool enable_stealing, std::size_t &added)

This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).

void on_start_thread(std::size_t num_thread)
void on_stop_thread(std::size_t num_thread)
void on_error(std::size_t num_thread, std::exception_ptr const &e)
void reset_thread_distribution()

Public Static Functions

static std::string get_scheduler_name()

Protected Attributes

std::atomic<std::size_t> curr_queue_
detail::affinity_data const &affinity_data_
const std::size_t num_queues_
const std::size_t num_high_priority_queues_
thread_queue_type low_priority_queue_
std::vector<util::cache_line_data<thread_queue_type*>> queues_
std::vector<util::cache_line_data<thread_queue_type*>> high_priority_queues_
std::vector<util::cache_line_data<std::vector<std::size_t>>> victim_threads_
struct init_parameter

Public Functions

template<>
init_parameter(std::size_t num_queues, detail::affinity_data const &affinity_data, std::size_t num_high_priority_queues = std::size_t(-1), thread_queue_init_parameters thread_queue_init = {}, char const *description = "local_priority_queue_scheduler")
template<>
init_parameter(std::size_t num_queues, detail::affinity_data const &affinity_data, char const *description)

Public Members

template<>
std::size_t num_queues_
template<>
std::size_t num_high_priority_queues_
template<>
thread_queue_init_parameters thread_queue_init_
template<>
detail::affinity_data const &affinity_data_
template<>
char const *description_
namespace hpx
namespace threads
namespace policies

Typedefs

using default_local_queue_scheduler_terminated_queue = lockfree_fifo
template<typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo, typename StagedQueuing = lockfree_fifo, typename TerminatedQueuing = default_local_queue_scheduler_terminated_queue>
class local_queue_scheduler : public scheduler_base
#include <local_queue_scheduler.hpp>

The local_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from.

Public Types

typedef std::false_type has_periodic_maintenance
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing> thread_queue_type
typedef init_parameter init_parameter_type

Public Functions

local_queue_scheduler(init_parameter_type const &init, bool deferred_initialization = true)
virtual ~local_queue_scheduler()
void abort_all_suspended_threads()
bool cleanup_terminated(bool delete_all)
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
void create_thread(thread_init_data &data, thread_id_ref_type *id, error_code &ec)
virtual bool get_next_thread(std::size_t num_thread, bool running, threads::thread_id_ref_type &thrd, bool)

Return the next thread to be executed, return false if none is available

void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority = thread_priority::normal)

Schedule the passed thread.

void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority = thread_priority::normal)
void destroy_thread(threads::thread_data *thrd)

Destroy the passed thread as it has been terminated.

std::int64_t get_queue_length(std::size_t num_thread = std::size_t(-1)) const
std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t num_thread = std::size_t(-1), bool = false) const
bool is_core_idle(std::size_t num_thread) const
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
virtual bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool, std::size_t &added)

This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).

void on_start_thread(std::size_t num_thread)
void on_stop_thread(std::size_t num_thread)
void on_error(std::size_t num_thread, std::exception_ptr const &e)

Public Static Functions

static std::string get_scheduler_name()

Protected Attributes

std::vector<thread_queue_type*> queues_
std::atomic<std::size_t> curr_queue_
detail::affinity_data const &affinity_data_
mask_type steals_in_numa_domain_
mask_type steals_outside_numa_domain_
std::vector<mask_type> numa_domain_masks_
std::vector<mask_type> outside_numa_domain_masks_
struct init_parameter

Public Functions

template<>
init_parameter(std::size_t num_queues, detail::affinity_data const &affinity_data, thread_queue_init_parameters thread_queue_init = {}, char const *description = "local_queue_scheduler")
template<>
init_parameter(std::size_t num_queues, detail::affinity_data const &affinity_data, char const *description)

Public Members

template<>
std::size_t num_queues_
template<>
thread_queue_init_parameters thread_queue_init_
template<>
detail::affinity_data const &affinity_data_
template<>
char const *description_
namespace hpx
namespace threads
namespace policies
struct concurrentqueue_fifo
template<typename T>
struct apply

Public Types

template<>
using type = moodycamel_fifo_backend<T>
struct lockfree_fifo
template<typename T>
struct apply

Public Types

template<>
using type = lockfree_fifo_backend<T>
template<typename T>
struct lockfree_fifo_backend

Public Types

template<>
using container_type = boost::lockfree::queue<T, hpx::util::aligned_allocator<T>>
template<>
using value_type = T
template<>
using reference = T&
template<>
using const_reference = T const&
template<>
using rvalue_reference = T&&
template<>
using size_type = std::uint64_t

Public Functions

lockfree_fifo_backend(size_type initial_size = 0, size_type = size_type(-1))
bool push(const_reference val, bool = false)
bool push(rvalue_reference val, bool = false)
bool pop(reference val, bool = true)
bool empty()

Private Members

container_type queue_
template<typename T>
struct moodycamel_fifo_backend

Public Types

template<>
using container_type = hpx::concurrency::ConcurrentQueue<T>
template<>
using value_type = T
template<>
using reference = T&
template<>
using const_reference = T const&
template<>
using rvalue_reference = T&&
template<>
using size_type = std::uint64_t

Public Functions

moodycamel_fifo_backend(size_type initial_size = 0, size_type = size_type(-1))
bool push(const_reference val, bool = false)
bool push(rvalue_reference val, bool = false)
bool pop(reference val, bool = true)
bool empty()

Private Members

container_type queue_

Defines

QUEUE_HOLDER_NUMA_DEBUG
namespace hpx

Functions

static hpx::debug::enable_print<QUEUE_HOLDER_NUMA_DEBUG> hpx::nq_deb("QH_NUMA")
namespace threads
namespace policies
template<typename QueueType>
struct queue_holder_numa

Public Types

template<>
using ThreadQueue = queue_holder_thread<QueueType>
template<>
using mutex_type = typename QueueType::mutex_type

Public Functions

queue_holder_numa()
~queue_holder_numa()
void init(std::size_t domain, std::size_t queues)
std::size_t size() const
ThreadQueue *thread_queue(std::size_t id) const
bool get_next_thread_HP(std::size_t qidx, threads::thread_id_ref_type &thrd, bool stealing, bool core_stealing)
bool get_next_thread(std::size_t qidx, threads::thread_id_ref_type &thrd, bool stealing, bool core_stealing)
bool add_new_HP(ThreadQueue *receiver, std::size_t qidx, std::size_t &added, bool stealing, bool allow_stealing)
bool add_new(ThreadQueue *receiver, std::size_t qidx, std::size_t &added, bool stealing, bool allow_stealing)
std::size_t get_new_tasks_queue_length() const
std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_) const
void abort_all_suspended_threads()
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state, ) const
void increment_num_pending_misses(std::size_t = 1)
void increment_num_pending_accesses(std::size_t = 1)
void increment_num_stolen_from_pending(std::size_t = 1)
void increment_num_stolen_from_staged(std::size_t = 1)
void increment_num_stolen_to_pending(std::size_t = 1)
void increment_num_stolen_to_staged(std::size_t = 1)
bool dump_suspended_threads(std::size_t, std::int64_t&, bool)
void debug_info()
void on_start_thread(std::size_t)
void on_stop_thread(std::size_t)
void on_error(std::size_t, std::exception_ptr const&)

Public Members

std::size_t num_queues_
std::size_t domain_
std::vector<ThreadQueue*> queues_

Defines

QUEUE_HOLDER_THREAD_DEBUG
namespace hpx

Functions

static hpx::debug::enable_print<QUEUE_HOLDER_THREAD_DEBUG> hpx::tq_deb("QH_THRD")
namespace threads
namespace policies

Enums

enum [anonymous]

Values:

max_thread_count = 1000
enum [anonymous]

Values:

round_robin_rollover = 1

Functions

std::size_t fast_mod(std::size_t const input, std::size_t const ceil)
template<typename QueueType>
struct queue_holder_thread

Public Types

template<>
using thread_holder_type = queue_holder_thread<QueueType>
template<>
using mutex_type = std::mutex
typedef std::unique_lock<mutex_type> scoped_lock
template<>
using thread_heap_type = std::list<thread_id_type, util::internal_allocator<thread_id_type>>
template<>
using task_description = thread_init_data
template<>
using thread_map_type = std::unordered_set<thread_id_type, std::hash<thread_id_type>, std::equal_to<thread_id_type>, util::internal_allocator<thread_id_type>>
template<>
using terminated_items_type = lockfree_fifo::apply<thread_data*>::type

Public Functions

queue_holder_thread(QueueType *bp_queue, QueueType *hp_queue, QueueType *np_queue, QueueType *lp_queue, std::size_t domain, std::size_t queue, std::size_t thread_num, std::size_t owner, const thread_queue_init_parameters &init)
~queue_holder_thread()
bool owns_bp_queue() const
bool owns_hp_queue() const
bool owns_np_queue() const
bool owns_lp_queue() const
std::size_t worker_next(std::size_t const workers) const
void schedule_thread(threads::thread_id_ref_type thrd, thread_priority priority, bool other_end = false)
bool cleanup_terminated(std::size_t thread_num, bool delete_all)
void create_thread(thread_init_data &data, thread_id_ref_type *tid, std::size_t thread_num, error_code &ec)
void create_thread_object(threads::thread_id_ref_type &tid, threads::thread_init_data &data)
void recycle_thread(thread_id_type tid)
void add_to_thread_map(threads::thread_id_type tid)
void remove_from_thread_map(threads::thread_id_type tid, bool dealloc)
bool get_next_thread_HP(threads::thread_id_ref_type &thrd, bool stealing, bool check_new)
bool get_next_thread(threads::thread_id_ref_type &thrd, bool stealing)
std::size_t add_new_HP(std::int64_t add_count, thread_holder_type *addfrom, bool stealing)
std::size_t add_new(std::int64_t add_count, thread_holder_type *addfrom, bool stealing)
std::size_t get_queue_length()
std::size_t get_thread_count_staged(thread_priority priority) const
std::size_t get_thread_count_pending(thread_priority priority) const
std::size_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_) const
void destroy_thread(threads::thread_data *thrd, std::size_t thread_num, bool xthread)

Destroy the passed thread as it has been terminated.

void abort_all_suspended_threads()
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
void debug_info()
void debug_queues(const char *prefix)

Public Members

QueueType *const bp_queue_
QueueType *const hp_queue_
QueueType *const np_queue_
QueueType *const lp_queue_
const std::size_t domain_index_
const std::size_t queue_index_
const std::size_t thread_num_
const std::size_t owner_mask_
util::cache_line_data<mutex_type> thread_map_mtx_
thread_heap_type thread_heap_small_
thread_heap_type thread_heap_medium_
thread_heap_type thread_heap_large_
thread_heap_type thread_heap_huge_
thread_heap_type thread_heap_nostack_
util::cache_line_data<std::tuple<std::size_t, std::size_t>> rollover_counters_
thread_map_type thread_map_
util::cache_line_data<std::atomic<std::int32_t>> thread_map_count_
terminated_items_type terminated_items_
util::cache_line_data<std::atomic<std::int32_t>> terminated_items_count_
thread_queue_init_parameters parameters_

Public Static Functions

static void deallocate(threads::thread_data *p)

Public Static Attributes

util::internal_allocator<threads::thread_data> thread_alloc_
struct queue_data_print

Public Functions

template<>
queue_data_print(const queue_holder_thread *q)

Public Members

template<>
const queue_holder_thread *q_

Friends

std::ostream &operator<<(std::ostream &os, const queue_data_print &d)
struct queue_mc_print

Public Functions

template<>
queue_mc_print(const QueueType *const q)

Public Members

template<>
const QueueType *const q_

Friends

std::ostream &operator<<(std::ostream &os, const queue_mc_print &d)

Defines

SHARED_PRIORITY_SCHEDULER_DEBUG
SHARED_PRIORITY_QUEUE_SCHEDULER_API
namespace hpx

Typedefs

using print_onoff = hpx::debug::enable_print<SHARED_PRIORITY_SCHEDULER_DEBUG>
using print_on = hpx::debug::enable_print<false>

Functions

static print_onoff hpx::spq_deb("SPQUEUE")
static print_on hpx::spq_arr("SPQUEUE")
namespace threads
namespace policies

Typedefs

using default_shared_priority_queue_scheduler_terminated_queue = lockfree_fifo
struct core_ratios

Public Functions

core_ratios(std::size_t high_priority, std::size_t normal_priority, std::size_t low_priority)

Public Members

std::size_t high_priority
std::size_t normal_priority
std::size_t low_priority
template<typename Mutex = std::mutex, typename PendingQueuing = concurrentqueue_fifo, typename TerminatedQueuing = default_shared_priority_queue_scheduler_terminated_queue>
class shared_priority_queue_scheduler : public scheduler_base
#include <shared_priority_queue_scheduler.hpp>

The shared_priority_queue_scheduler maintains a set of high, normal, and low priority queues. For each priority level there is a core/queue ratio which determines how many cores share a single queue. If the high priority core/queue ratio is 4 the first 4 cores will share a single high priority queue, the next 4 will share another one and so on. In addition, the shared_priority_queue_scheduler is NUMA-aware and takes NUMA scheduling hints into account when creating and scheduling work.

Warning: PendingQueuing lifo causes lockup on termination

Public Types

template<>
using has_periodic_maintenance = std::false_type
template<>
using thread_queue_type = thread_queue_mc<Mutex, PendingQueuing, PendingQueuing, TerminatedQueuing>
template<>
using thread_holder_type = queue_holder_thread<thread_queue_type>
typedef init_parameter init_parameter_type

Public Functions

shared_priority_queue_scheduler(init_parameter const &init)
virtual ~shared_priority_queue_scheduler()
void set_scheduler_mode(scheduler_mode mode)
void abort_all_suspended_threads()
std::size_t local_thread_number()
bool cleanup_terminated(bool delete_all)
bool cleanup_terminated(std::size_t, bool delete_all)
void create_thread(thread_init_data &data, thread_id_ref_type *thrd, error_code &ec)
template<typename T>
bool steal_by_function(std::size_t domain, std::size_t q_index, bool steal_numa, bool steal_core, thread_holder_type *origin, T &var, const char *prefix, util::function_nonser<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation_HP, util::function_nonser<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation)
virtual bool get_next_thread(std::size_t thread_num, bool running, threads::thread_id_ref_type &thrd, bool enable_stealing)

Return the next thread to be executed, return false if none available.

virtual bool wait_or_add_new(std::size_t, bool, std::int64_t&, bool, std::size_t &added)

Return the next thread to be executed, return false if none available.

void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority priority = thread_priority::normal)

Schedule the passed thread.

void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority priority = thread_priority::normal)

Put task on the back of the queue : not yet implemented just put it on the normal queue for now

void destroy_thread(threads::thread_data *thrd)
std::int64_t get_queue_length(std::size_t thread_num = std::size_t(-1)) const
std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t thread_num = std::size_t(-1), bool = false) const
bool is_core_idle(std::size_t num_thread) const
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
void on_start_thread(std::size_t local_thread)
void on_stop_thread(std::size_t thread_num)
void on_error(std::size_t thread_num, std::exception_ptr const&)

Public Static Functions

static std::string get_scheduler_name()

Protected Types

typedef queue_holder_numa<thread_queue_type> numa_queues

Protected Attributes

std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_counts_
std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_offset_
std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> numa_holder_
std::vector<std::size_t> d_lookup_
std::vector<std::size_t> q_lookup_
core_ratios cores_per_queue_
bool round_robin_
bool steal_hp_first_
bool numa_stealing_
bool core_stealing_
std::size_t num_workers_
std::size_t num_domains_
detail::affinity_data const &affinity_data_
const thread_queue_init_parameters queue_parameters_
std::mutex init_mutex
bool initialized_
bool debug_init_
std::atomic<std::size_t> thread_init_counter_
std::size_t pool_index_
struct init_parameter

Public Functions

template<>
init_parameter(std::size_t num_worker_threads, const core_ratios &cores_per_queue, detail::affinity_data const &affinity_data, const thread_queue_init_parameters &thread_queue_init, char const *description = "shared_priority_queue_scheduler")
template<>
init_parameter(std::size_t num_worker_threads, const core_ratios &cores_per_queue, detail::affinity_data const &affinity_data, char const *description)

Public Members

template<>
std::size_t num_worker_threads_
template<>
core_ratios cores_per_queue_
template<>
thread_queue_init_parameters thread_queue_init_
template<>
detail::affinity_data const &affinity_data_
template<>
char const *description_
namespace hpx
namespace threads
namespace policies

Typedefs

using default_static_priority_queue_scheduler_terminated_queue = lockfree_fifo
template<typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo, typename StagedQueuing = lockfree_fifo, typename TerminatedQueuing = default_static_priority_queue_scheduler_terminated_queue>
class static_priority_queue_scheduler : public hpx::threads::policies::local_priority_queue_scheduler<std::mutex, lockfree_fifo, lockfree_fifo, default_static_priority_queue_scheduler_terminated_queue>
#include <static_priority_queue_scheduler.hpp>

The static_priority_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from. Additionally it maintains separate queues: several for high priority threads and one for low priority threads. High priority threads are executed by the first N OS threads before any other work is executed. Low priority threads are executed by the last OS thread whenever no other work is available. This scheduler does not do any work stealing.

Public Types

template<>
using base_type = local_priority_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
template<>
using init_parameter_type = typename base_type::init_parameter_type

Public Functions

static_priority_queue_scheduler(init_parameter_type const &init, bool deferred_initialization = true)
void set_scheduler_mode(scheduler_mode mode)

Public Static Functions

static std::string get_scheduler_name()
namespace hpx
namespace threads
namespace policies

Typedefs

using default_static_queue_scheduler_terminated_queue = lockfree_fifo
template<typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo, typename StagedQueuing = lockfree_fifo, typename TerminatedQueuing = default_static_queue_scheduler_terminated_queue>
class static_queue_scheduler : public hpx::threads::policies::local_queue_scheduler<std::mutex, lockfree_fifo, lockfree_fifo, default_static_queue_scheduler_terminated_queue>
#include <static_queue_scheduler.hpp>

The local_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from.

Public Types

typedef local_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing> base_type

Public Functions

static_queue_scheduler(typename base_type::init_parameter_type const &init, bool deferred_initialization = true)
void set_scheduler_mode(scheduler_mode mode)
bool get_next_thread(std::size_t num_thread, bool, threads::thread_id_ref_type &thrd, bool)

Return the next thread to be executed, return false if none is available

bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool, std::size_t &added)

This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).

Public Static Functions

static std::string get_scheduler_name()
namespace hpx
namespace threads
namespace policies
template<typename Mutex, typename PendingQueuing, typename StagedQueuing, typename TerminatedQueuing>
class thread_queue

Public Functions

bool cleanup_terminated_locked(bool delete_all = false)

This function makes sure all threads which are marked for deletion (state is terminated) are properly destroyed.

This returns ‘true’ if there are no more terminated threads waiting to be deleted.

bool cleanup_terminated(bool delete_all = false)
thread_queue(std::size_t queue_num = std::size_t(-1), thread_queue_init_parameters parameters = {})
~thread_queue()
std::int64_t get_queue_length(std::memory_order order = std::memory_order_acquire) const
std::int64_t get_pending_queue_length(std::memory_order order = std::memory_order_acquire) const
std::int64_t get_staged_queue_length(std::memory_order order = std::memory_order_acquire) const
constexpr void increment_num_pending_misses(std::size_t = 1)
constexpr void increment_num_pending_accesses(std::size_t = 1)
constexpr void increment_num_stolen_from_pending(std::size_t = 1)
constexpr void increment_num_stolen_from_staged(std::size_t = 1)
constexpr void increment_num_stolen_to_pending(std::size_t = 1)
constexpr void increment_num_stolen_to_staged(std::size_t = 1)
void create_thread(thread_init_data &data, thread_id_ref_type *id, error_code &ec)
void move_work_items_from(thread_queue *src, std::int64_t count)
void move_task_items_from(thread_queue *src, std::int64_t count)
bool get_next_thread(threads::thread_id_ref_type &thrd, bool allow_stealing = false, bool steal = false)

Return the next thread to be executed, return false if none is available

void schedule_thread(threads::thread_id_ref_type thrd, bool other_end = false)

Schedule the passed thread.

void destroy_thread(threads::thread_data *thrd)

Destroy the passed thread as it has been terminated.

std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown) const

Return the number of existing threads with the given state.

void abort_all_suspended_threads()
bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
bool wait_or_add_new(bool, std::size_t &added, bool steal = false)

This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).

bool wait_or_add_new(bool running, std::size_t &added, thread_queue *addfrom, bool steal = false)
bool dump_suspended_threads(std::size_t num_thread, std::int64_t &idle_loop_count, bool running)
void on_start_thread(std::size_t)
void on_stop_thread(std::size_t)
void on_error(std::size_t, std::exception_ptr const&)

Public Static Functions

static void deallocate(threads::thread_data *p)

Protected Functions

template<typename Lock>
void create_thread_object(threads::thread_id_ref_type &thrd, threads::thread_init_data &data, Lock &lk)
std::size_t add_new(std::int64_t add_count, thread_queue *addfrom, std::unique_lock<mutex_type> &lk, bool steal = false)
bool add_new_always(std::size_t &added, thread_queue *addfrom, std::unique_lock<mutex_type> &lk, bool steal = false)
void recycle_thread(thread_id_type thrd)

Protected Static Attributes

util::internal_allocator<typename thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>::task_description> task_description_alloc_

Private Types

template<>
using mutex_type = Mutex
template<>
using thread_map_type = std::unordered_set<thread_id_type, std::hash<thread_id_type>, std::equal_to<thread_id_type>, util::internal_allocator<thread_id_type>>
template<>
using thread_heap_type = std::vector<thread_id_type, util::internal_allocator<thread_id_type>>
template<>
using thread_description_ptr = typename thread_id_ref_type::thread_repr*
template<>
using work_items_type = typename PendingQueuing::template apply<thread_description_ptr>::type
template<>
using task_items_type = typename StagedQueuing::template apply<task_description*>::type
template<>
using terminated_items_type = typename TerminatedQueuing::template apply<thread_data*>::type

Private Members

thread_queue_init_parameters parameters_
mutex_type mtx_
thread_map_type thread_map_
std::atomic<std::int64_t> thread_map_count_
work_items_type work_items_
terminated_items_type terminated_items_
std::atomic<std::int64_t> terminated_items_count_
task_items_type new_tasks_
thread_heap_type thread_heap_small_
thread_heap_type thread_heap_medium_
thread_heap_type thread_heap_large_
thread_heap_type thread_heap_huge_
thread_heap_type thread_heap_nostack_
util::cache_line_data<std::atomic<std::int64_t>> new_tasks_count_
util::cache_line_data<std::atomic<std::int64_t>> work_items_count_
struct task_description

Public Members

template<>
thread_init_data data

Defines

THREAD_QUEUE_MC_DEBUG
namespace hpx

Functions

static hpx::debug::enable_print<THREAD_QUEUE_MC_DEBUG> hpx::tqmc_deb("_TQ_MC_")
namespace threads
namespace policies
template<typename Mutex, typename PendingQueuing, typename StagedQueuing, typename TerminatedQueuing>
class thread_queue_mc

Public Types

typedef Mutex mutex_type
template<>
using thread_queue_type = thread_queue_mc<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
template<>
using thread_heap_type = std::list<thread_id_type, util::internal_allocator<thread_id_type>>
template<>
using task_description = thread_init_data
template<>
using thread_description = thread_data
typedef PendingQueuing::template apply<thread_id_ref_type>::type work_items_type
typedef concurrentqueue_fifo::apply<task_description>::type task_items_type

Public Functions

std::size_t add_new(std::int64_t add_count, thread_queue_type *addfrom, bool stealing)
thread_queue_mc(const thread_queue_init_parameters &parameters, std::size_t queue_num = std::size_t(-1))
void set_holder(queue_holder_thread<thread_queue_type> *holder)
~thread_queue_mc()
std::int64_t get_queue_length() const
std::int64_t get_queue_length_pending() const
std::int64_t get_queue_length_staged(std::memory_order order = std::memory_order_relaxed) const
std::int64_t get_thread_count() const
void create_thread(thread_init_data &data, thread_id_ref_type *id, error_code &ec)
bool get_next_thread(threads::thread_id_ref_type &thrd, bool other_end, bool check_new = false)

Return the next thread to be executed, return false if none is available

void schedule_work(threads::thread_id_ref_type thrd, bool other_end)

Schedule the passed thread (put it on the ready work queue)

void on_start_thread(std::size_t)
void on_stop_thread(std::size_t)
void on_error(std::size_t, std::exception_ptr const&)

Public Members

thread_queue_init_parameters parameters_
const int queue_index_
queue_holder_thread<thread_queue_type> *holder_
task_items_type new_task_items_
work_items_type work_items_
util::cache_line_data<std::atomic<std::int32_t>> new_tasks_count_
util::cache_line_data<std::atomic<std::int32_t>> work_items_count_