hpx/schedulers/shared_priority_queue_scheduler.hpp

See Public API for a list of names and headers that are part of the public HPX API.

Defines

SHARED_PRIORITY_SCHEDULER_DEBUG
SHARED_PRIORITY_QUEUE_SCHEDULER_API
namespace hpx

Typedefs

using print_onoff = hpx::debug::enable_print<SHARED_PRIORITY_SCHEDULER_DEBUG>
using print_on = hpx::debug::enable_print<false>

Functions

static print_onoff hpx::spq_deb("SPQUEUE")
static print_on hpx::spq_arr("SPQUEUE")
namespace threads
namespace policies

Typedefs

using default_shared_priority_queue_scheduler_terminated_queue = lockfree_fifo
struct core_ratios

Public Functions

core_ratios(std::size_t high_priority, std::size_t normal_priority, std::size_t low_priority)

Public Members

std::size_t high_priority
std::size_t normal_priority
std::size_t low_priority
template<typename Mutex = std::mutex, typename PendingQueuing = concurrentqueue_fifo, typename TerminatedQueuing = default_shared_priority_queue_scheduler_terminated_queue>
class shared_priority_queue_scheduler : public scheduler_base
#include <shared_priority_queue_scheduler.hpp>

The shared_priority_queue_scheduler maintains a set of high, normal, and low priority queues. For each priority level there is a core/queue ratio which determines how many cores share a single queue. If the high priority core/queue ratio is 4 the first 4 cores will share a single high priority queue, the next 4 will share another one and so on. In addition, the shared_priority_queue_scheduler is NUMA-aware and takes NUMA scheduling hints into account when creating and scheduling work.

Warning: PendingQueuing lifo causes lockup on termination

Public Types

template<>
using has_periodic_maintenance = std::false_type
template<>
using thread_queue_type = thread_queue_mc<Mutex, PendingQueuing, PendingQueuing, TerminatedQueuing>
template<>
using thread_holder_type = queue_holder_thread<thread_queue_type>
typedef init_parameter init_parameter_type

Public Functions

shared_priority_queue_scheduler(init_parameter const &init)
virtual ~shared_priority_queue_scheduler()
void set_scheduler_mode(scheduler_mode mode)
void abort_all_suspended_threads()
std::size_t local_thread_number()
bool cleanup_terminated(bool delete_all)
bool cleanup_terminated(std::size_t, bool delete_all)
void create_thread(thread_init_data &data, thread_id_ref_type *thrd, error_code &ec)
template<typename T>
bool steal_by_function(std::size_t domain, std::size_t q_index, bool steal_numa, bool steal_core, thread_holder_type *origin, T &var, const char *prefix, hpx::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation_HP, hpx::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation)
virtual bool get_next_thread(std::size_t thread_num, bool running, threads::thread_id_ref_type &thrd, bool enable_stealing)

Return the next thread to be executed, return false if none available.

virtual bool wait_or_add_new(std::size_t, bool, std::int64_t&, bool, std::size_t &added)

Return the next thread to be executed, return false if none available.

void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority priority = thread_priority::normal)

Schedule the passed thread.

void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, thread_priority priority = thread_priority::normal)

Put task on the back of the queue : not yet implemented just put it on the normal queue for now

void destroy_thread(threads::thread_data *thrd)
std::int64_t get_queue_length(std::size_t thread_num = std::size_t(-1)) const
std::int64_t get_thread_count(thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, std::size_t thread_num = std::size_t(-1), bool = false) const
bool is_core_idle(std::size_t num_thread) const
bool enumerate_threads(hpx::function<bool(thread_id_type)> const &f, thread_schedule_state state = thread_schedule_state::unknown, ) const
void on_start_thread(std::size_t local_thread)
void on_stop_thread(std::size_t thread_num)
void on_error(std::size_t thread_num, std::exception_ptr const&)

Public Static Functions

static std::string get_scheduler_name()

Protected Types

typedef queue_holder_numa<thread_queue_type> numa_queues

Protected Attributes

std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_counts_
std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_offset_
std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> numa_holder_
std::vector<std::size_t> d_lookup_
std::vector<std::size_t> q_lookup_
core_ratios cores_per_queue_
bool round_robin_
bool steal_hp_first_
bool numa_stealing_
bool core_stealing_
std::size_t num_workers_
std::size_t num_domains_
detail::affinity_data const &affinity_data_
const thread_queue_init_parameters queue_parameters_
std::mutex init_mutex
bool initialized_
bool debug_init_
std::atomic<std::size_t> thread_init_counter_
std::size_t pool_index_
struct init_parameter

Public Functions

template<>
init_parameter(std::size_t num_worker_threads, const core_ratios &cores_per_queue, detail::affinity_data const &affinity_data, const thread_queue_init_parameters &thread_queue_init, char const *description = "shared_priority_queue_scheduler")
template<>
init_parameter(std::size_t num_worker_threads, const core_ratios &cores_per_queue, detail::affinity_data const &affinity_data, char const *description)

Public Members

template<>
std::size_t num_worker_threads_
template<>
core_ratios cores_per_queue_
template<>
thread_queue_init_parameters thread_queue_init_
template<>
detail::affinity_data const &affinity_data_
template<>
char const *description_