schedulers¶
The contents of this module can be included with the header
hpx/modules/schedulers.hpp
. These headers may be used by user-code but are not
guaranteed stable (neither header location nor contents). You are using these at
your own risk. If you wish to use non-public functionality from a module we
strongly suggest only including the module header hpx/modules/schedulers.hpp
, not
the particular header in which the functionality you would like to use is
defined. See Public API for a list of names that are part of the public
HPX API.
-
namespace
hpx
-
namespace
threads
-
namespace
policies
¶ Typedefs
-
using
default_local_priority_queue_scheduler_terminated_queue
= lockfree_fifo¶
-
template<typename
Mutex
= std::mutex, typenamePendingQueuing
= lockfree_fifo, typenameStagedQueuing
= lockfree_fifo, typenameTerminatedQueuing
= default_local_priority_queue_scheduler_terminated_queue>
classlocal_priority_queue_scheduler
: public scheduler_base¶ - #include <local_priority_queue_scheduler.hpp>
The local_priority_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from. Additionally it maintains separate queues: several for high priority threads and one for low priority threads. High priority threads are executed by the first N OS threads before any other work is executed. Low priority threads are executed by the last OS thread whenever no other work is available.
Public Types
-
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
thread_queue_type
¶
-
typedef init_parameter
init_parameter_type
¶
Public Functions
-
local_priority_queue_scheduler
(init_parameter_type const &init, bool deferred_initialization = true)¶
-
~local_priority_queue_scheduler
()¶
-
void
abort_all_suspended_threads
()¶
-
bool
cleanup_terminated
(bool delete_all)¶
-
void
create_thread
(thread_init_data &data, thread_id_type *id, error_code &ec)¶
-
bool
get_next_thread
(std::size_t num_thread, bool running, threads::thread_data *&thrd, bool enable_stealing)¶ Return the next thread to be executed, return false if none is available
-
void
schedule_thread
(threads::thread_data *thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, thread_priority priority = thread_priority::normal)¶ Schedule the passed thread.
-
void
schedule_thread_last
(threads::thread_data *thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, thread_priority priority = thread_priority::normal)¶
-
void
destroy_thread
(threads::thread_data *thrd)¶ Destroy the passed thread as it has been terminated.
-
std::int64_t
get_thread_count
(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t num_thread = std::size_t(-1), bool = false) const¶
-
bool
enumerate_threads
(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const¶
-
bool
wait_or_add_new
(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool enable_stealing, std::size_t &added)¶ This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).
-
void
reset_thread_distribution
()¶
Protected Attributes
-
detail::affinity_data const &
affinity_data_
¶
-
thread_queue_type
low_priority_queue_
¶
-
std::vector<util::cache_line_data<thread_queue_type*>>
queues_
¶
-
std::vector<util::cache_line_data<thread_queue_type*>>
high_priority_queues_
¶
-
struct
init_parameter
¶ Public Functions
-
template<>
init_parameter
(std::size_t num_queues, detail::affinity_data const &affinity_data, std::size_t num_high_priority_queues = std::size_t(-1), thread_queue_init_parameters thread_queue_init = {}, char const *description = "local_priority_queue_scheduler")¶
-
template<>
-
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
-
using
-
namespace
-
namespace
-
namespace
hpx
-
namespace
threads
-
namespace
policies
Typedefs
-
using
default_local_queue_scheduler_terminated_queue
= lockfree_fifo¶
-
template<typename
Mutex
= std::mutex, typenamePendingQueuing
= lockfree_fifo, typenameStagedQueuing
= lockfree_fifo, typenameTerminatedQueuing
= default_local_queue_scheduler_terminated_queue>
classlocal_queue_scheduler
: public scheduler_base¶ - #include <local_queue_scheduler.hpp>
The local_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from.
Public Types
-
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
thread_queue_type
¶
-
typedef init_parameter
init_parameter_type
¶
Public Functions
-
local_queue_scheduler
(init_parameter_type const &init, bool deferred_initialization = true)¶
-
virtual
~local_queue_scheduler
()¶
-
void
abort_all_suspended_threads
()¶
-
bool
cleanup_terminated
(bool delete_all)¶
-
void
create_thread
(thread_init_data &data, thread_id_type *id, error_code &ec)¶
-
virtual bool
get_next_thread
(std::size_t num_thread, bool running, threads::thread_data *&thrd, bool)¶ Return the next thread to be executed, return false if none is available
-
void
schedule_thread
(threads::thread_data *thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority = thread_priority::normal)¶ Schedule the passed thread.
-
void
schedule_thread_last
(threads::thread_data *thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority = thread_priority::normal)¶
-
void
destroy_thread
(threads::thread_data *thrd)¶ Destroy the passed thread as it has been terminated.
-
std::int64_t
get_thread_count
(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t num_thread = std::size_t(-1), bool = false) const¶
-
bool
enumerate_threads
(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const¶
-
virtual bool
wait_or_add_new
(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool, std::size_t &added)¶ This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).
Protected Attributes
-
std::vector<thread_queue_type*>
queues_
¶
-
detail::affinity_data const &
affinity_data_
¶
-
mask_type
steals_in_numa_domain_
¶
-
mask_type
steals_outside_numa_domain_
¶
-
struct
init_parameter
¶ Public Functions
-
template<>
init_parameter
(std::size_t num_queues, detail::affinity_data const &affinity_data, thread_queue_init_parameters thread_queue_init = {}, char const *description = "local_queue_scheduler")¶
-
template<>
-
typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
-
using
-
namespace
-
namespace
-
namespace
hpx
-
namespace
threads
-
namespace
policies
-
struct
concurrentqueue_fifo
¶
-
struct
lockfree_fifo
¶
-
template<typename
T
>
structlockfree_fifo_backend
¶ Public Types
-
template<>
usingvalue_type
= T¶
-
template<>
usingreference
= T&¶
-
template<>
usingconst_reference
= T const&¶
Public Functions
-
lockfree_fifo_backend
(size_type initial_size = 0, size_type = size_type(-1))¶
-
bool
push
(const_reference val, bool = false)¶
-
bool
pop
(reference val, bool = true)¶
-
bool
empty
()¶
Private Members
-
container_type
queue_
¶
-
template<>
-
template<typename
T
>
structmoodycamel_fifo_backend
¶ Public Types
-
template<>
usingcontainer_type
= hpx::concurrency::ConcurrentQueue<T>¶
-
template<>
usingvalue_type
= T¶
-
template<>
usingreference
= T&¶
-
template<>
usingconst_reference
= T const&¶
-
template<>
usingrval_reference
= T&&¶
Public Functions
-
moodycamel_fifo_backend
(size_type initial_size = 0, size_type = size_type(-1))¶
-
bool
push
(rval_reference val, bool = false)¶
-
bool
push
(const_reference val, bool = false)¶
-
bool
pop
(reference val, bool = true)¶
-
bool
empty
()¶
Private Members
-
container_type
queue_
¶
-
template<>
-
struct
-
namespace
-
namespace
Defines
-
QUEUE_HOLDER_NUMA_DEBUG
¶
-
namespace
hpx
Functions
-
static hpx::debug::enable_print<QUEUE_HOLDER_NUMA_DEBUG> hpx::nq_deb("QH_NUMA")
-
namespace
threads
-
namespace
policies
-
template<typename
QueueType
>
structqueue_holder_numa
¶ Public Types
-
template<>
usingThreadQueue
= queue_holder_thread<QueueType>¶
-
template<>
usingmutex_type
= typename QueueType::mutex_type¶
Public Functions
-
queue_holder_numa
()¶
-
~queue_holder_numa
()¶
-
bool
get_next_thread_HP
(std::size_t qidx, threads::thread_data *&thrd, bool stealing, bool core_stealing)¶
-
bool
get_next_thread
(std::size_t qidx, threads::thread_data *&thrd, bool stealing, bool core_stealing)¶
-
bool
add_new_HP
(ThreadQueue *receiver, std::size_t qidx, std::size_t &added, bool stealing, bool allow_stealing)¶
-
bool
add_new
(ThreadQueue *receiver, std::size_t qidx, std::size_t &added, bool stealing, bool allow_stealing)¶
-
std::int64_t
get_thread_count
(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_) const¶
-
void
abort_all_suspended_threads
()¶
-
bool
enumerate_threads
(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state, ) const¶
-
void
debug_info
()¶
-
template<>
-
template<typename
-
namespace
-
Defines
-
QUEUE_HOLDER_THREAD_DEBUG
¶
-
namespace
hpx
Functions
-
static hpx::debug::enable_print<QUEUE_HOLDER_THREAD_DEBUG> hpx::tq_deb("QH_THRD")
-
namespace
threads
-
namespace
policies
Enums
-
template<typename
QueueType
>
structqueue_holder_thread
¶ Public Types
-
template<>
usingthread_holder_type
= queue_holder_thread<QueueType>¶
-
template<>
usingthread_heap_type
= std::list<thread_id_type, util::internal_allocator<thread_id_type>>¶
-
template<>
usingtask_description
= thread_init_data¶
-
template<>
usingthread_map_type
= std::unordered_set<thread_id_type, std::hash<thread_id_type>, std::equal_to<thread_id_type>, util::internal_allocator<thread_id_type>>¶
-
template<>
usingterminated_items_type
= lockfree_fifo::apply<thread_data*>::type¶
Public Functions
-
queue_holder_thread
(QueueType *bp_queue, QueueType *hp_queue, QueueType *np_queue, QueueType *lp_queue, std::size_t domain, std::size_t queue, std::size_t thread_num, std::size_t owner, const thread_queue_init_parameters &init)¶
-
~queue_holder_thread
()¶
-
bool
owns_bp_queue
() const¶
-
bool
owns_hp_queue
() const¶
-
bool
owns_np_queue
() const¶
-
bool
owns_lp_queue
() const¶
-
void
schedule_thread
(threads::thread_data *thrd, thread_priority priority, bool other_end = false)¶
-
void
create_thread
(thread_init_data &data, thread_id_type *tid, std::size_t thread_num, error_code &ec)¶
-
void
create_thread_object
(threads::thread_id_type &tid, threads::thread_init_data &data)¶
-
void
recycle_thread
(thread_id_type tid)¶
-
void
add_to_thread_map
(threads::thread_id_type tid)¶
-
void
remove_from_thread_map
(threads::thread_id_type tid, bool dealloc)¶
-
bool
get_next_thread_HP
(threads::thread_data *&thrd, bool stealing, bool check_new)¶
-
bool
get_next_thread
(threads::thread_data *&thrd, bool stealing)¶
-
std::size_t
get_thread_count_staged
(thread_priority priority) const¶
-
std::size_t
get_thread_count_pending
(thread_priority priority) const¶
-
std::size_t
get_thread_count
(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_) const¶
-
void
destroy_thread
(threads::thread_data *thrd, std::size_t thread_num, bool xthread)¶ Destroy the passed thread as it has been terminated.
-
void
abort_all_suspended_threads
()¶
-
bool
enumerate_threads
(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const¶
-
void
debug_info
()¶
-
void
debug_queues
(const char *prefix)¶
Public Members
-
QueueType *const
bp_queue_
¶
-
QueueType *const
hp_queue_
¶
-
QueueType *const
np_queue_
¶
-
QueueType *const
lp_queue_
¶
-
util::cache_line_data<mutex_type>
thread_map_mtx_
¶
-
thread_heap_type
thread_heap_small_
¶
-
thread_heap_type
thread_heap_medium_
¶
-
thread_heap_type
thread_heap_large_
¶
-
thread_heap_type
thread_heap_huge_
¶
-
thread_heap_type
thread_heap_nostack_
¶
-
thread_map_type
thread_map_
¶
-
util::cache_line_data<std::atomic<std::int32_t>>
thread_map_count_
¶
-
terminated_items_type
terminated_items_
¶
-
util::cache_line_data<std::atomic<std::int32_t>>
terminated_items_count_
¶
-
thread_queue_init_parameters
parameters_
¶
Public Static Functions
-
static void
deallocate
(threads::thread_data *p)¶
Public Static Attributes
-
util::internal_allocator<threads::thread_data>
thread_alloc_
¶
-
template<>
-
template<typename
-
namespace
-
-
namespace
hpx
Typedefs
Functions
-
static print_onoff hpx::spq_deb("SPQUEUE")
-
static print_on hpx::spq_arr("SPQUEUE")
-
namespace
threads
-
namespace
policies
Typedefs
-
struct
core_ratios
¶ Public Functions
- #include <shared_priority_queue_scheduler.hpp>
The shared_priority_queue_scheduler maintains a set of high, normal, and low priority queues. For each priority level there is a core/queue ratio which determines how many cores share a single queue. If the high priority core/queue ratio is 4 the first 4 cores will share a single high priority queue, the next 4 will share another one and so on. In addition, the shared_priority_queue_scheduler is NUMA-aware and takes NUMA scheduling hints into account when creating and scheduling work.
Warning: PendingQueuing lifo causes lockup on termination
Public Types
Public Functions
Return the next thread to be executed, return false if none available.
Return the next thread to be executed, return false if none available.
Schedule the passed thread.
Put task on the back of the queue : not yet implemented just put it on the normal queue for now
Public Static Functions
Protected Types
Protected Attributes
Public Functions
Public Members
-
struct
-
namespace
-
-
namespace
hpx
-
namespace
threads
-
namespace
policies
Typedefs
-
using
default_static_priority_queue_scheduler_terminated_queue
= lockfree_fifo¶
-
template<typename
Mutex
= std::mutex, typenamePendingQueuing
= lockfree_fifo, typenameStagedQueuing
= lockfree_fifo, typenameTerminatedQueuing
= default_static_priority_queue_scheduler_terminated_queue>
classstatic_priority_queue_scheduler
: public hpx::threads::policies::local_priority_queue_scheduler<std::mutex, lockfree_fifo, lockfree_fifo, default_static_priority_queue_scheduler_terminated_queue>¶ - #include <static_priority_queue_scheduler.hpp>
The static_priority_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from. Additionally it maintains separate queues: several for high priority threads and one for low priority threads. High priority threads are executed by the first N OS threads before any other work is executed. Low priority threads are executed by the last OS thread whenever no other work is available. This scheduler does not do any work stealing.
Public Types
-
template<>
usingbase_type
= local_priority_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>¶
Public Functions
-
static_priority_queue_scheduler
(init_parameter_type const &init, bool deferred_initialization = true)¶
-
void
set_scheduler_mode
(scheduler_mode mode)¶
-
template<>
-
using
-
namespace
-
namespace
-
namespace
hpx
-
namespace
threads
-
namespace
policies
Typedefs
-
using
default_static_queue_scheduler_terminated_queue
= lockfree_fifo¶
-
template<typename
Mutex
= std::mutex, typenamePendingQueuing
= lockfree_fifo, typenameStagedQueuing
= lockfree_fifo, typenameTerminatedQueuing
= default_static_queue_scheduler_terminated_queue>
classstatic_queue_scheduler
: public hpx::threads::policies::local_queue_scheduler<std::mutex, lockfree_fifo, lockfree_fifo, default_static_queue_scheduler_terminated_queue>¶ - #include <static_queue_scheduler.hpp>
The local_queue_scheduler maintains exactly one queue of work items (threads) per OS thread, where this OS thread pulls its next work from.
Public Types
-
typedef local_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
base_type
¶
Public Functions
-
static_queue_scheduler
(typename base_type::init_parameter_type const &init, bool deferred_initialization = true)¶
-
void
set_scheduler_mode
(scheduler_mode mode)¶
-
bool
get_next_thread
(std::size_t num_thread, bool, threads::thread_data *&thrd, bool)¶ Return the next thread to be executed, return false if none is available
-
bool
wait_or_add_new
(std::size_t num_thread, bool running, std::int64_t &idle_loop_count, bool, std::size_t &added)¶ This is a function which gets called periodically by the thread manager to allow for maintenance tasks to be executed in the scheduler. Returns true if the OS thread calling this function has to be terminated (i.e. no more work has to be done).
-
typedef local_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>
-
using
-
namespace
-
namespace
-
namespace
hpx
-
namespace
threads
-
namespace
policies
-
template<typename
Mutex
, typenamePendingQueuing
, typenameStagedQueuing
, typenameTerminatedQueuing
>
classthread_queue
¶ Public Functions
-
bool
cleanup_terminated_locked
(bool delete_all = false)¶ This function makes sure all threads which are marked for deletion (state is terminated) are properly destroyed.
This returns ‘true’ if there are no more terminated threads waiting to be deleted.
-
bool
cleanup_terminated
(bool delete_all = false)¶
-
thread_queue
(std::size_t queue_num = std::size_t(-1), thread_queue_init_parameters parameters = {})¶
-
~thread_queue
()¶
-
void
create_thread
(thread_init_data &data, thread_id_type *id, error_code &ec)¶
-
bool
get_next_thread
(threads::thread_data *&thrd, bool allow_stealing = false, bool steal = false)¶ Return the next thread to be executed, return false if none is available
-
void
schedule_thread
(threads::thread_data *thrd, bool other_end = false)¶ Schedule the passed thread.
-
void
destroy_thread
(threads::thread_data *thrd)¶ Destroy the passed thread as it has been terminated.
-
std::int64_t
get_thread_count
(thread_schedule_state state = thread_schedule_state::unknown) const¶ Return the number of existing threads with the given state.
-
void
abort_all_suspended_threads
()¶
-
bool
enumerate_threads
(util::function_nonser<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const¶
Public Static Functions
-
static void
deallocate
(threads::thread_data *p)¶
Protected Functions
-
template<typename
Lock
>
voidcreate_thread_object
(threads::thread_id_type &thrd, threads::thread_init_data &data, Lock &lk)¶
-
std::size_t
add_new
(std::int64_t add_count, thread_queue *addfrom, std::unique_lock<mutex_type> &lk, bool steal = false)¶
-
bool
add_new_always
(std::size_t &added, thread_queue *addfrom, std::unique_lock<mutex_type> &lk, bool steal = false)¶
-
void
recycle_thread
(thread_id_type thrd)¶
Protected Static Attributes
-
util::internal_allocator<typename thread_queue<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>::task_description>
task_description_alloc_
¶
Private Types
-
template<>
usingmutex_type
= Mutex¶
-
template<>
usingthread_map_type
= std::unordered_set<thread_id_type, std::hash<thread_id_type>, std::equal_to<thread_id_type>, util::internal_allocator<thread_id_type>>¶
-
template<>
usingthread_heap_type
= std::list<thread_id_type, util::internal_allocator<thread_id_type>>¶
-
template<>
usingthread_description
= thread_data¶
-
template<>
usingwork_items_type
= typename PendingQueuing::template apply<thread_description*>::type¶
-
template<>
usingtask_items_type
= typename StagedQueuing::template apply<task_description*>::type¶
-
template<>
usingterminated_items_type
= typename TerminatedQueuing::template apply<thread_data*>::type¶
Private Members
-
thread_queue_init_parameters
parameters_
¶
-
mutex_type
mtx_
¶
-
thread_map_type
thread_map_
¶
-
work_items_type
work_items_
¶
-
terminated_items_type
terminated_items_
¶
-
task_items_type
new_tasks_
¶
-
thread_heap_type
thread_heap_small_
¶
-
thread_heap_type
thread_heap_medium_
¶
-
thread_heap_type
thread_heap_large_
¶
-
thread_heap_type
thread_heap_huge_
¶
-
thread_heap_type
thread_heap_nostack_
¶
-
util::cache_line_data<std::atomic<std::int64_t>>
new_tasks_count_
¶
-
util::cache_line_data<std::atomic<std::int64_t>>
work_items_count_
¶
-
struct
task_description
¶ Public Members
-
template<>
thread_init_datadata
¶
-
template<>
-
bool
-
template<typename
-
namespace
-
namespace
Defines
-
THREAD_QUEUE_MC_DEBUG
¶
-
namespace
hpx
Functions
-
static hpx::debug::enable_print<THREAD_QUEUE_MC_DEBUG> hpx::tqmc_deb("_TQ_MC_")
-
namespace
threads
-
namespace
policies
-
template<typename
Mutex
, typenamePendingQueuing
, typenameStagedQueuing
, typenameTerminatedQueuing
>
classthread_queue_mc
¶ Public Types
-
typedef Mutex
mutex_type
¶
-
template<>
usingthread_queue_type
= thread_queue_mc<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>¶
-
template<>
usingthread_heap_type
= std::list<thread_id_type, util::internal_allocator<thread_id_type>>¶
-
template<>
usingtask_description
= thread_init_data¶
-
template<>
usingthread_description
= thread_data¶
-
typedef PendingQueuing::template apply<thread_description*>::type
work_items_type
¶
-
typedef concurrentqueue_fifo::apply<task_description>::type
task_items_type
¶
Public Functions
-
thread_queue_mc
(const thread_queue_init_parameters ¶meters, std::size_t queue_num = std::size_t(-1))¶
-
void
set_holder
(queue_holder_thread<thread_queue_type> *holder)¶
-
~thread_queue_mc
()¶
-
void
create_thread
(thread_init_data &data, thread_id_type *id, error_code &ec)¶
-
bool
get_next_thread
(threads::thread_data *&thrd, bool other_end, bool check_new = false)¶ Return the next thread to be executed, return false if none is available
-
void
schedule_work
(threads::thread_data *thrd, bool other_end)¶ Schedule the passed thread (put it on the ready work queue)
Public Members
-
thread_queue_init_parameters
parameters_
¶
-
const int
queue_index_
¶
-
queue_holder_thread<thread_queue_type> *
holder_
¶
-
task_items_type
new_task_items_
¶
-
work_items_type
work_items_
¶
-
util::cache_line_data<std::atomic<std::int32_t>>
new_tasks_count_
¶
-
util::cache_line_data<std::atomic<std::int32_t>>
work_items_count_
¶
-
typedef Mutex
-
template<typename
-
namespace
-