Remote execution with actions
Contents
Remote execution with actions#
This program will print out a hello world message on every OS-thread on every locality. The output will look something like this:
hello world from OS-thread 1 on locality 0
hello world from OS-thread 1 on locality 1
hello world from OS-thread 0 on locality 0
hello world from OS-thread 0 on locality 1
Setup#
The source code for this example can be found here:
hello_world_distributed.cpp
.
To compile this program, go to your HPX build directory (see Building HPX for information on configuring and building HPX) and enter:
$ make examples.quickstart.hello_world_distributed
To run the program type:
$ ./bin/hello_world_distributed
This should print:
hello world from OS-thread 0 on locality 0
To use more OS-threads use the command line option --hpx:threads
and
type the number of threads that you wish to use. For example, typing:
$ ./bin/hello_world_distributed --hpx:threads 2
will yield:
hello world from OS-thread 1 on locality 0
hello world from OS-thread 0 on locality 0
Notice how the ordering of the two print statements will change with subsequent runs. To run this program on multiple localities please see the section How to use HPX applications with PBS.
Walkthrough#
Now that you have compiled and run the code, let’s look at how the code works,
beginning with main()
:
// Here is the main entry point. By using the include 'hpx/hpx_main.hpp' HPX
// will invoke the plain old C-main() as its first HPX thread.
int main()
{
// Get a list of all available localities.
std::vector<hpx::id_type> localities = hpx::find_all_localities();
// Reserve storage space for futures, one for each locality.
std::vector<hpx::future<void>> futures;
futures.reserve(localities.size());
for (hpx::id_type const& node : localities)
{
// Asynchronously start a new task. The task is encapsulated in a
// future, which we can query to determine if the task has
// completed.
typedef hello_world_foreman_action action_type;
futures.push_back(hpx::async<action_type>(node));
}
// The non-callback version of hpx::wait_all takes a single parameter,
// a vector of futures to wait on. hpx::wait_all only returns when
// all of the futures have finished.
hpx::wait_all(futures);
return 0;
}
In this excerpt of the code we again see the use of futures. This time the
futures are stored in a vector so that they can easily be accessed.
hpx::wait_all
is a family of functions that wait on for an
std::vector<>
of futures to become ready. In this piece of code, we are
using the synchronous version of hpx::wait_all
, which takes one
argument (the std::vector<>
of futures to wait on). This function will not
return until all the futures in the vector have been executed.
In Asynchronous execution with actions we used hpx::find_here
to specify the
target of our actions. Here, we instead use
hpx::find_all_localities
, which returns an std::vector<>
containing the identifiers of all the machines in the system, including the one
that we are on.
As in Asynchronous execution with actions our futures are set using
hpx::async<>
. The hello_world_foreman_action
is declared
here:
// Define the boilerplate code necessary for the function 'hello_world_foreman'
// to be invoked as an HPX action.
HPX_PLAIN_ACTION(hello_world_foreman, hello_world_foreman_action)
Another way of thinking about this wrapping technique is as follows: functions (the work to be done) are wrapped in actions, and actions can be executed locally or remotely (e.g. on another machine participating in the computation).
Now it is time to look at the hello_world_foreman()
function which was
wrapped in the action above:
void hello_world_foreman()
{
// Get the number of worker OS-threads in use by this locality.
std::size_t const os_threads = hpx::get_os_thread_count();
// Populate a set with the OS-thread numbers of all OS-threads on this
// locality. When the hello world message has been printed on a particular
// OS-thread, we will remove it from the set.
std::set<std::size_t> attendance;
for (std::size_t os_thread = 0; os_thread < os_threads; ++os_thread)
attendance.insert(os_thread);
// As long as there are still elements in the set, we must keep scheduling
// HPX-threads. Because HPX features work-stealing task schedulers, we have
// no way of enforcing which worker OS-thread will actually execute
// each HPX-thread.
while (!attendance.empty())
{
// Each iteration, we create a task for each element in the set of
// OS-threads that have not said "Hello world". Each of these tasks
// is encapsulated in a future.
std::vector<hpx::future<std::size_t>> futures;
futures.reserve(attendance.size());
for (std::size_t worker : attendance)
{
// Asynchronously start a new task. The task is encapsulated in a
// future that we can query to determine if the task has completed.
//
// We give the task a hint to run on a particular worker thread
// (core) and suggest binding the scheduled thread to the given
// core, but no guarantees are given by the scheduler that the task
// will actually run on that worker thread. It will however try as
// hard as possible to place the new task on the given worker
// thread.
hpx::execution::parallel_executor exec(
hpx::threads::thread_priority::bound);
hpx::threads::thread_schedule_hint hint(
hpx::threads::thread_schedule_hint_mode::thread,
static_cast<std::int16_t>(worker));
futures.push_back(
hpx::async(hpx::execution::experimental::with_hint(exec, hint),
hello_world_worker, worker));
}
// Wait for all of the futures to finish. The callback version of the
// hpx::wait_each function takes two arguments: a vector of futures,
// and a binary callback. The callback takes two arguments; the first
// is the index of the future in the vector, and the second is the
// return value of the future. hpx::wait_each doesn't return until
// all the futures in the vector have returned.
hpx::spinlock mtx;
hpx::wait_each(hpx::unwrapping([&](std::size_t t) {
if (std::size_t(-1) != t)
{
std::lock_guard<hpx::spinlock> lk(mtx);
attendance.erase(t);
}
}),
futures);
}
}
Now, before we discuss hello_world_foreman()
, let’s talk about the
hpx::wait_each
function.
The version of hpx::wait_each
invokes a callback function
provided by the user, supplying the callback function with the result of the
future.
In hello_world_foreman()
, an std::set<>
called attendance
keeps
track of which OS-threads have printed out the hello world message. When the
OS-thread prints out the statement, the future is marked as ready, and
hpx::wait_each
in hello_world_foreman()
. If it is not
executing on the correct OS-thread, it returns a value of -1, which causes
hello_world_foreman()
to leave the OS-thread id in attendance
.
std::size_t hello_world_worker(std::size_t desired)
{
// Returns the OS-thread number of the worker that is running this
// HPX-thread.
std::size_t current = hpx::get_worker_thread_num();
if (current == desired)
{
// The HPX-thread has been run on the desired OS-thread.
char const* msg = "hello world from OS-thread {1} on locality {2}\n";
hpx::util::format_to(hpx::cout, msg, desired, hpx::get_locality_id())
<< std::flush;
return desired;
}
// This HPX-thread has been run by the wrong OS-thread, make the foreman
// try again by rescheduling it.
return std::size_t(-1);
}
Because HPX features work stealing task schedulers, there is no way to guarantee that an action will be scheduled on a particular OS-thread. This is why we must use a guess-and-check approach.