#include <genesis/utils/threading/thread_pool.hpp>
Thread pool for distributed work.
This simple implementation offer a standing pool of worker threads that pick up tasks.
For reasons explained below, it is recommended to initialize a global thread pool via Options::get().global_thread_pool(), with one fewer threads than intended to keep busy, as the main thread will also be able to do busy work while waiting for tasks via our work stealing ProactiveFuture. Use guess_number_of_threads() for obtaining the adequate number of total threads to run, and subtract one to get the number to use this class with.
Example
// Create a thread pool with 3 worker threads, on a 4 core system. ThreadPool thread_pool( 3 ); // Enqueue a new task by providing a function and its arguments, and store its future result. // This is a ProactiveFuture, so that calling wait() or get() on it will process other tasks. auto result = thread_pool.enqueue( []( int some_param ) { // do computations int some_result = 42; return some_result; }, 0 // value for `some_param` ); // Get the value of `some_result` from the future. // As this is a future, the function call to get() blocks until a thread has finished the // work, but also processes other tasks from the queue in the meantime, see ProactiveFuture. std::cout << result.get() << "\n";
As the workers are stored in the ThreadPool object itself, it does not allow to be copied.
The pool implements a work stealing technique similar to the one described in the "C++ Concurrency in Action" book by Anthony Williams, second edition, chapter 9, in order to avoid dead locking when tasks submit their own tasks. In such cases, the parent task could then potentially be waiting for the child, but the child might never start, as all threads in the pool are busy waiting for the children they enqueued. Hence, our wrapper implementation, called ProactiveFuture (a thin wrapper around std::future
; see there for details), instead processes tasks from the queue while waiting for the future, so that those do not starve.
This mechanism also allows to start a ThreadPool with 0 threads. In that case, all tasks will be processed once wait() or get() is called on their returned ProactiveFuture - essentially making the pool behave as a lazy evaluator of the tasks. This is very convenient behavior to ensure that the number of actually busy threads is exactly known - a main thread that waits for some submitted tasks will also be doing work while waiting. Hence, we recommend to start this pool with one fewer threads than hardware concurrency (or whatever other upper limit you want to ensure, e.g., Slurm).
Lastly, if upon construction a maximum queue size is provided, only that many tasks will be queued at a time (with a bit of leeway, due to concurrency). If a thread calls enqueue() when the queue is already filled with waiting tasks up to the maximum size, the caller instead waits for the queue to be below the specified max, and while waiting, starts processing tasks of its own, so that the waiting time is spend productively.
This is meant as a mechanism to allow a main thread to just keep queueing work without capturing the futures and waiting for them, while avoiding to endlessly queue new tasks, with the workers not being able to catch up. The max size needs to be at least double the number of threads for this to make sense. Due to concurrency, the max size can be exceeded by the number of threads, in case that many threads enqueue work simultaneously. That is okay, as we usually just want there to be some upper limit on the number of tasks. Also, in case of just a single main thread that is enqueueing new tasks, the maximum is never exceeded.
When using this mechanism of submitting work without storing the futures, the wait() function of the class can be used to wait for all current work to be done. This is intended to be called, for instance, from the main thread, once there is no more work to be enqueued.
Definition at line 333 of file thread_pool.hpp.
Public Member Functions | |
ThreadPool (size_t num_threads, size_t max_queue_size=0) | |
Construct a thread pool with a given number of workers. More... | |
ThreadPool (ThreadPool &&)=delete | |
ThreadPool (ThreadPool const &)=delete | |
~ThreadPool () | |
Destruct the thread pool, waiting for all unfinished tasks. More... | |
template<class F , class... Args> | |
auto | enqueue_and_retrieve (F &&f, Args &&... args) -> ProactiveFuture< typename std::result_of< F(Args...)>::type > |
Enqueue a new task, using a function to call and its arguments, and returning a future to receive the result of the task. More... | |
template<class F , class... Args> | |
void | enqueue_detached (F &&f, Args &&... args) |
Enqueue a new task, using a function to call and its arguments, without a std::future. More... | |
ThreadPool & | operator= (ThreadPool &&)=delete |
ThreadPool & | operator= (ThreadPool const &)=delete |
size_t | pending_tasks_count () const |
Return the current number of pending tasks. More... | |
size_t | size () const |
Return the number of workers of the thread pool. More... | |
bool | try_run_pending_task () |
Helper function to run a pending task from outside the pool. More... | |
void | wait_for_all_pending_tasks () |
Wait for all current tasks to be finished processing. More... | |
|
inlineexplicit |
Construct a thread pool with a given number of workers.
We allow for 0 tasks on construction. With no threads in the pool, every task submitted will be processed instead once its future is queried via wait or get; it then hence behaves as a lazy evaluating task queue
If a max_queue_size
is set to a value other than zero, roughly that many tasks will only be enqueued at the same time. See the class description for details.
Definition at line 351 of file thread_pool.hpp.
|
delete |
|
delete |
|
inline |
Destruct the thread pool, waiting for all unfinished tasks.
Definition at line 377 of file thread_pool.hpp.
|
inline |
Enqueue a new task, using a function to call and its arguments, and returning a future to receive the result of the task.
The enqueue function returns a future that can be used to check whether the task has finished, or to wait for it to be finished. This also allows the task to send its result back to the caller, if needed, by simply returning it from the task function.
We internally use a std::packaged_task, so that any exception thrown in the function will be caught and trapped inside of the future, until its get() function is called. See enqueue_detached() for an alternative function that does not incur the overhead of creating the packaged_task and future, and hence has 50% less overhead.
If enqueuing the task would exceed the max queue size, we instead first process existing tasks until there is e space in the queue. This makes the caller do wait and work.
Definition at line 454 of file thread_pool.hpp.
|
inline |
Enqueue a new task, using a function to call and its arguments, without a std::future.
This function simply submits the task to the pool, but does not create a std::future for the caller to wait for the result. Hence, this mostly makes sense for tasks that do not return a result that is needed. Thus, the task function itself needs to take care for propagating its result, if needed. This has 50% less overhead compared to enqueue_and_retrieve().
If enqueuing the task would exceed the max queue size, we instead first process existing tasks until there is enough space in the queue. This makes the caller do wait and work.
Definition at line 502 of file thread_pool.hpp.
|
delete |
|
delete |
|
inline |
Return the current number of pending tasks.
These is the number of tasks that have been enqueued, but not yet finished running. It hence includes both the number of waiting tasks and those that are currently being processed by a worker thread. The count is only reduced once a task is finished (or threw an exception). The counter can be used to wait for all enqueued tasks to be done, which is what wait_for_all_pending_tasks() does.
Note that there is a very small window where it can happen that this counter is reduced after finishing the work of a task, but before setting the value of its associated promise. Hence, this counter might exclude a finished task for which the caller is still waiting for the future returned from the enqueue_and_retrieve() call. That should usually not be an issue though, as the caller will typically just wait for the future anyway, instead of checking this counter.
Definition at line 427 of file thread_pool.hpp.
|
inline |
Return the number of workers of the thread pool.
Definition at line 406 of file thread_pool.hpp.
|
inline |
Helper function to run a pending task from outside the pool.
The return value indicates whether a task has been run. If no tasks are enqueued, return false
without doing anything. This is the function that allows ProactiveFuture to process tasks while waiting.
Definition at line 536 of file thread_pool.hpp.
|
inline |
Wait for all current tasks to be finished processing.
This function simply calls try_run_pending_task() until there are no more tasks to process. This is an alternative mechanism for tasks whose future has not been captured when being enqueued. This can be used for instance by a main thread that keeps submitting work, and then later needs to wait for everything to be finished. For this use case, it might make sense to set a max_queue_size when constructing the pool, to ensure that the pool does not grow indefinitely. See the main class description for details.
Definition at line 559 of file thread_pool.hpp.