Remote execution with actions#

This program will print out a hello world message on every OS-thread on every locality. The output will look something like this:

hello world from OS-thread 1 on locality 0
hello world from OS-thread 1 on locality 1
hello world from OS-thread 0 on locality 0
hello world from OS-thread 0 on locality 1

Setup#

The source code for this example can be found here: hello_world_distributed.cpp.

To compile this program, go to your HPX build directory (see Building HPX for information on configuring and building HPX) and enter:

$ make examples.quickstart.hello_world_distributed

To run the program type:

$ ./bin/hello_world_distributed

This should print:

hello world from OS-thread 0 on locality 0

To use more OS-threads use the command line option --hpx:threads and type the number of threads that you wish to use. For example, typing:

$ ./bin/hello_world_distributed --hpx:threads 2

will yield:

hello world from OS-thread 1 on locality 0
hello world from OS-thread 0 on locality 0

Notice how the ordering of the two print statements will change with subsequent runs. To run this program on multiple localities please see the section How to use HPX applications with PBS.

Walkthrough#

Now that you have compiled and run the code, let’s look at how the code works, beginning with main():

// Here is the main entry point. By using the include 'hpx/hpx_main.hpp' HPX
// will invoke the plain old C-main() as its first HPX thread.
int main()
{
    // Get a list of all available localities.
    std::vector<hpx::id_type> localities = hpx::find_all_localities();

    // Reserve storage space for futures, one for each locality.
    std::vector<hpx::future<void>> futures;
    futures.reserve(localities.size());

    for (hpx::id_type const& node : localities)
    {
        // Asynchronously start a new task. The task is encapsulated in a
        // future, which we can query to determine if the task has
        // completed.
        typedef hello_world_foreman_action action_type;
        futures.push_back(hpx::async<action_type>(node));
    }

    // The non-callback version of hpx::wait_all takes a single parameter,
    // a vector of futures to wait on. hpx::wait_all only returns when
    // all of the futures have finished.
    hpx::wait_all(futures);
    return 0;
}

In this excerpt of the code we again see the use of futures. This time the futures are stored in a vector so that they can easily be accessed. hpx::wait_all is a family of functions that wait on for an std::vector<> of futures to become ready. In this piece of code, we are using the synchronous version of hpx::wait_all, which takes one argument (the std::vector<> of futures to wait on). This function will not return until all the futures in the vector have been executed.

In Asynchronous execution with actions we used hpx::find_here to specify the target of our actions. Here, we instead use hpx::find_all_localities, which returns an std::vector<> containing the identifiers of all the machines in the system, including the one that we are on.

As in Asynchronous execution with actions our futures are set using hpx::async<>. The hello_world_foreman_action is declared here:

// Define the boilerplate code necessary for the function 'hello_world_foreman'
// to be invoked as an HPX action.
HPX_PLAIN_ACTION(hello_world_foreman, hello_world_foreman_action)

Another way of thinking about this wrapping technique is as follows: functions (the work to be done) are wrapped in actions, and actions can be executed locally or remotely (e.g. on another machine participating in the computation).

Now it is time to look at the hello_world_foreman() function which was wrapped in the action above:

void hello_world_foreman()
{
    // Get the number of worker OS-threads in use by this locality.
    std::size_t const os_threads = hpx::get_os_thread_count();

    // Populate a set with the OS-thread numbers of all OS-threads on this
    // locality. When the hello world message has been printed on a particular
    // OS-thread, we will remove it from the set.
    std::set<std::size_t> attendance;
    for (std::size_t os_thread = 0; os_thread < os_threads; ++os_thread)
        attendance.insert(os_thread);

    // As long as there are still elements in the set, we must keep scheduling
    // HPX-threads. Because HPX features work-stealing task schedulers, we have
    // no way of enforcing which worker OS-thread will actually execute
    // each HPX-thread.
    while (!attendance.empty())
    {
        // Each iteration, we create a task for each element in the set of
        // OS-threads that have not said "Hello world". Each of these tasks
        // is encapsulated in a future.
        std::vector<hpx::future<std::size_t>> futures;
        futures.reserve(attendance.size());

        for (std::size_t worker : attendance)
        {
            // Asynchronously start a new task. The task is encapsulated in a
            // future that we can query to determine if the task has completed.
            //
            // We give the task a hint to run on a particular worker thread
            // (core) and suggest binding the scheduled thread to the given
            // core, but no guarantees are given by the scheduler that the task
            // will actually run on that worker thread. It will however try as
            // hard as possible to place the new task on the given worker
            // thread.
            hpx::execution::parallel_executor exec(
                hpx::threads::thread_priority::bound);

            hpx::threads::thread_schedule_hint hint(
                hpx::threads::thread_schedule_hint_mode::thread,
                static_cast<std::int16_t>(worker));

            futures.push_back(
                hpx::async(hpx::execution::experimental::with_hint(exec, hint),
                    hello_world_worker, worker));
        }

        // Wait for all of the futures to finish. The callback version of the
        // hpx::wait_each function takes two arguments: a vector of futures,
        // and a binary callback.  The callback takes two arguments; the first
        // is the index of the future in the vector, and the second is the
        // return value of the future. hpx::wait_each doesn't return until
        // all the futures in the vector have returned.
        hpx::spinlock mtx;
        hpx::wait_each(hpx::unwrapping([&](std::size_t t) {
            if (std::size_t(-1) != t)
            {
                std::lock_guard<hpx::spinlock> lk(mtx);
                attendance.erase(t);
            }
        }),
            futures);
    }
}

Now, before we discuss hello_world_foreman(), let’s talk about the hpx::wait_each function. The version of hpx::wait_each invokes a callback function provided by the user, supplying the callback function with the result of the future.

In hello_world_foreman(), an std::set<> called attendance keeps track of which OS-threads have printed out the hello world message. When the OS-thread prints out the statement, the future is marked as ready, and hpx::wait_each in hello_world_foreman(). If it is not executing on the correct OS-thread, it returns a value of -1, which causes hello_world_foreman() to leave the OS-thread id in attendance.

std::size_t hello_world_worker(std::size_t desired)
{
    // Returns the OS-thread number of the worker that is running this
    // HPX-thread.
    std::size_t current = hpx::get_worker_thread_num();
    if (current == desired)
    {
        // The HPX-thread has been run on the desired OS-thread.
        char const* msg = "hello world from OS-thread {1} on locality {2}\n";

        hpx::util::format_to(hpx::cout, msg, desired, hpx::get_locality_id())
            << std::flush;

        return desired;
    }

    // This HPX-thread has been run by the wrong OS-thread, make the foreman
    // try again by rescheduling it.
    return std::size_t(-1);
}

Because HPX features work stealing task schedulers, there is no way to guarantee that an action will be scheduled on a particular OS-thread. This is why we must use a guess-and-check approach.