A library for working with phylogenetic and population genetic data.
v0.32.0
thread_pool.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_THREAD_POOL_H_
2 #define GENESIS_UTILS_THREADING_THREAD_POOL_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
35 
36 #include <atomic>
37 #include <cassert>
38 #include <chrono>
39 #include <iterator>
40 #include <functional>
41 #include <future>
42 #include <memory>
43 #include <queue>
44 #include <stdexcept>
45 #include <thread>
46 #include <type_traits>
47 #include <utility>
48 #include <vector>
49 
50 namespace genesis {
51 namespace utils {
52 
53 // =================================================================================================
54 // Forward Declarations
55 // =================================================================================================
56 
57 class ThreadPool;
58 
59 // =================================================================================================
60 // Proactive Future
61 // =================================================================================================
62 
78 template<class T>
80 {
81 public:
82 
83  // -------------------------------------------------------------
84  // Constructors and Rule of Five
85  // -------------------------------------------------------------
86 
91  ProactiveFuture() noexcept = default;
92 
93 private:
94 
98  ProactiveFuture( std::future<T> future_result, ThreadPool& thread_pool )
99  : future_( std::move( future_result ))
100  , thread_pool_( &thread_pool )
101  {}
102 
103  friend ThreadPool;
104 
105 public:
106 
107  ~ProactiveFuture() noexcept = default;
108 
109  ProactiveFuture( ProactiveFuture&& ) noexcept = default;
110  ProactiveFuture( const ProactiveFuture& ) = delete;
111 
112  ProactiveFuture& operator=( ProactiveFuture&& ) noexcept = default;
113  ProactiveFuture& operator=( const ProactiveFuture& ) = delete;
114 
115  // -------------------------------------------------------------
116  // Forwarded members
117  // -------------------------------------------------------------
118 
122  T get()
123  {
124  // Use our busy waiting first, until we are ready.
125  wait();
126  assert( ready() );
127  return future_.get();
128  }
129 
133  template<typename U = T>
134  typename std::enable_if<!std::is_void<U>::value, U&>::type get()
135  {
136  // Enable this method only if T is not void (non-void types).
137  static_assert( ! std::is_same<T, void>::value, "ProactiveFuture::get() intended for T != void" );
138 
139  // Use our busy waiting first, until we are ready.
140  wait();
141  assert( ready() );
142  return future_.get();
143  }
144 
148  template<typename U = T>
149  typename std::enable_if<std::is_void<U>::value>::type get()
150  {
151  // Enable this method only if T is void
152  static_assert( std::is_same<T, void>::value, "ProactiveFuture::get() intended for T == void" );
153 
154  // Use our busy waiting first, until we are ready.
155  wait();
156  assert( ready() );
157  return future_.get();
158  }
159 
163  bool valid() const noexcept
164  {
165  return future_.valid();
166  }
167 
174  void wait() const;
175 
187  template< class Rep, class Period >
188  std::future_status wait_for( std::chrono::duration<Rep,Period> const& timeout_duration ) const
189  {
190  // If the user species a time to wait for, we just forward that to the future.
191  return future_.wait_for( timeout_duration );
192  }
193 
201  template< class Clock, class Duration >
202  std::future_status wait_until( std::chrono::time_point<Clock,Duration> const& timeout_time ) const
203  {
204  // If the user species a time to wait until, we just forward that to the future.
205  return future_.wait_until( timeout_time );
206  }
207 
208  // -------------------------------------------------------------
209  // Additional members
210  // -------------------------------------------------------------
211 
215  bool ready() const
216  {
217  throw_if_invalid_();
218  return future_.wait_for( std::chrono::seconds(0) ) == std::future_status::ready;
219  }
220 
228  bool deferred() const
229  {
230  throw_if_invalid_();
231  return future_.wait_for( std::chrono::seconds(0) ) == std::future_status::deferred;
232  }
233 
234  // -------------------------------------------------------------
235  // Internal members
236  // -------------------------------------------------------------
237 
238 private:
239 
240  void throw_if_invalid_() const
241  {
242  // From: https://en.cppreference.com/w/cpp/thread/future/wait
243  // The implementations are encouraged to detect the case when valid() == false before the
244  // call and throw a std::future_error with an error condition of std::future_errc::no_state.
245  if( !future_.valid() ) {
246  throw std::future_error( std::future_errc::no_state );
247  }
248  }
249 
250  // -------------------------------------------------------------
251  // Data members
252  // -------------------------------------------------------------
253 
254 private:
255 
256  std::future<T> future_;
257  ThreadPool* thread_pool_;
258 
259 };
260 
261 // =================================================================================================
262 // Thread Pool
263 // =================================================================================================
264 
334 {
335 public:
336 
337  // -------------------------------------------------------------
338  // Constructors and Rule of Five
339  // -------------------------------------------------------------
340 
351  explicit ThreadPool( size_t num_threads, size_t max_queue_size = 0 )
352  : max_queue_size_( max_queue_size )
353  {
354  // We disallow a max queue size smaller than half the number of threads.
355  // That would be slow and inefficient, and just not really what we want.
356  if( max_queue_size_ > 0 && max_queue_size_ < num_threads * 2 ) {
357  throw std::runtime_error(
358  "Cannot use ThreadPool with max queue size less than "
359  "half the number of threads, for efficiency"
360  );
361  }
362 
363  // Create the threads.
364  init_( num_threads );
365  assert( worker_pool_.size() == num_threads );
366  }
367 
368  ThreadPool( ThreadPool const& ) = delete;
369  ThreadPool( ThreadPool&& ) = delete;
370 
371  ThreadPool& operator= ( ThreadPool const& ) = delete;
372  ThreadPool& operator= ( ThreadPool&& ) = delete;
373 
378  {
379  // Just in case, we wait for any unfinished work to be done, to avoid terminating
380  // when tasks are still doing work that needs to be finished.
382  assert( unfinished_tasks_.load() == 0 );
383 
384  // Send the special stop task to the pool, once for each worker.
385  // As each worker stops upon receiving the task, this stops all workers.
386  for( size_t i = 0; i < worker_pool_.size(); ++i ) {
387  task_queue_.enqueue( WrappedTask( true ));
388  }
389 
390  // Join them back into the main thread, after which there is no unfinished work.
391  for( std::thread& worker : worker_pool_ ) {
392  if( worker.joinable() ) {
393  worker.join();
394  }
395  }
396  assert( unfinished_tasks_.load() == 0 );
397  }
398 
399  // -------------------------------------------------------------
400  // Accessor Members
401  // -------------------------------------------------------------
402 
406  size_t size() const
407  {
408  return worker_pool_.size();
409  }
410 
427  size_t pending_tasks_count() const
428  {
429  return unfinished_tasks_.load();
430  // return task_queue_.size_approx();
431  }
432 
433  // -------------------------------------------------------------
434  // Pool Functionality
435  // -------------------------------------------------------------
436 
453  template<class F, class... Args>
454  auto enqueue_and_retrieve( F&& f, Args&&... args )
455  -> ProactiveFuture<typename std::result_of<F(Args...)>::type>
456  {
457  using result_type = typename std::result_of<F(Args...)>::type;
458 
459  // Make sure that we do not enqueue more tasks than the max size.
460  run_tasks_until_below_max_queue_size_();
461 
462  // Prepare a promise and associated future of the task; the latter is our return value.
463  // Using a promise ensures that any exception thrown in the task function
464  // will be caught by the future, and re-thrown when its get() function is called,
465  // see e.g., https://stackoverflow.com/a/16345305/4184258
466  auto task_promise = std::make_shared<std::promise<result_type>>();
467  auto future_result = ProactiveFuture<result_type>( task_promise->get_future(), *this );
468 
469  // To make our lives easier for the helper functions used below, we just wrap
470  // the task in a function that can be called without arguments.
471  std::function<result_type()> task_function = std::bind(
472  std::forward<F>(f), std::forward<Args>(args)...
473  );
474 
475  // Prepare the task that we want to submit.
476  // All this wrapping should be completely transparent to the compiler, and removed.
477  // The task captures the package including the promise that is needed for the future.
478  WrappedTask wrapped_task;
479  wrapped_task.function = make_wrapped_task_with_promise_( task_promise, task_function );
480 
481  // We first incrementi the unfinished counter, and only decrementing it once the task has
482  // been fully processed. Thus, the counter always tells us if there is still work going on.
483  ++unfinished_tasks_;
484  task_queue_.enqueue( std::move( wrapped_task ));
485 
486  // The task is submitted. Return its future for the caller to be able to wait for it.
487  return future_result;
488  }
489 
501  template<class F, class... Args>
502  void enqueue_detached( F&& f, Args&&... args )
503  {
504  // Make sure that we do not enqueue more tasks than the max size.
505  run_tasks_until_below_max_queue_size_();
506 
507  // Prepare the task that we want to submit, by wrapping the function to be called.
508  // All this wrapping should be completely transparent to the compiler, and removed.
509  // The task captures the package including the promise that is needed for the future.
510  WrappedTask wrapped_task;
511  auto task_function = std::bind( std::forward<F>(f), std::forward<Args>(args)... );
512  wrapped_task.function = [task_function, this]()
513  {
514  // Run the actual work task here. Once done, we can signal this to the unfinished list.
515  task_function();
516  assert( this->unfinished_tasks_.load() > 0 );
517  --this->unfinished_tasks_;
518  };
519 
520  // We add the task, incrementing the unfinished counter, and only decrementing it once the
521  // task has been fully processed. That way, the counter always tells us if there is still
522  // work going on. We capture a reference to `this` in the task above, which could be
523  // dangerous if the threads survive the lifetime of the pool, but given that their exit
524  // condition is only called from the pool destructor, this should never be able to happen.
525  ++unfinished_tasks_;
526  task_queue_.enqueue( std::move( wrapped_task ));
527  }
528 
537  {
538  // Similar to the worker function, but without the blocking wait, as we might not ever
539  // have any tasks in the queue, and would be waiting for the condition indefinitely.
540  // Instead, we here just want to process a task if there is one, or return otherwise.
541  WrappedTask task;
542  if( task_queue_.try_dequeue( task )) {
543  task.function();
544  return true;
545  }
546  return false;
547  }
548 
560  {
561  // Wait for all pending tasks to be processed. While we wait, we can also help
562  // processing tasks! The loop stops once there are not more unfinished tasks.
563  while( unfinished_tasks_.load() > 0 ) {
564  while( try_run_pending_task() );
565  std::this_thread::yield();
566  }
567  assert( unfinished_tasks_.load() == 0 );
568  }
569 
570  // -------------------------------------------------------------
571  // Wrapped Task
572  // -------------------------------------------------------------
573 
574 private:
575 
585  struct WrappedTask
586  {
587  explicit WrappedTask( bool stop = false )
588  : stop(stop)
589  {}
590 
591  ~WrappedTask() = default;
592 
593  WrappedTask( WrappedTask const& ) = delete;
594  WrappedTask( WrappedTask&& ) = default;
595 
596  WrappedTask& operator= ( WrappedTask const& ) = delete;
597  WrappedTask& operator= ( WrappedTask&& ) = default;
598 
599  std::function<void()> function;
600  bool stop;
601  };
602 
603  // -------------------------------------------------------------
604  // Internal Members
605  // -------------------------------------------------------------
606 
607  void init_( size_t num_threads )
608  {
609  // Create the desired number of workers.
610  worker_pool_.reserve( num_threads );
611  for( size_t i = 0; i < num_threads; ++i ) {
612  worker_pool_.emplace_back(
613  &worker_, this
614  );
615  }
616  }
617 
618  static void worker_( ThreadPool* pool )
619  {
620  // Using a token for the consumer speeds it up. This is created once per worker thread
621  // when the function is called from the thread constructor upon emplacing the worker
622  // in the pool in init_()
623  ConsumerToken consumer_token( pool->task_queue_ );
624 
625  // The worker runs an infinite loop of waiting for tasks,
626  // only stopping once a special "stop" task is received.
627  WrappedTask task;
628  while( true ) {
629  pool->task_queue_.wait_dequeue( consumer_token, task );
630  if( task.stop ) {
631  break;
632  }
633  task.function();
634  }
635  }
636 
637  inline void run_tasks_until_below_max_queue_size_()
638  {
639  // Check that we can enqueue a task at the moment, of if we need to wait and do work first.
640  // In a high-contention situation, this of course could fail, so that once the loop condition
641  // is checked, some other task already has finished the work. But that doesn't matter, the
642  // call to try_run_pending_task will catch that and just do nothing. Also, the other way round
643  // could happen, and the queue could in theory be overloaded if many threads try to enqueue
644  // at exactly the same time. But we probably never have enough threads for that to be a real
645  // issue - worst case, we exceed the max queue size by the number of threads, which is fine.
646  // All we want to avoid is to have an infinitely growing queue.
647  while( max_queue_size_ > 0 && pending_tasks_count() >= max_queue_size_ ) {
649  }
650  }
651 
652  template<typename T>
653  inline std::function<void()> make_wrapped_task_with_promise_(
654  std::shared_ptr<std::promise<T>> task_promise,
655  std::function<T()> task_function
656  ) {
657  // We capture a reference to `this` in the below lambda, which could be dangerous
658  // if the threads survive the lifetime of the pool, but given that the pool destructor
659  // waits for all of them to finish, this should never be able to happen.
660  return [this, task_promise, task_function]()
661  {
662  // Run the work task, and set the value of the associated promise.
663  // We need to delegate this here, as the std::promise::set_value() function
664  // differs for void and non-void return types. That is unfortunate.
665  // Also, as either the task function or setting the value of the promise can throw
666  // an exception, but in between we need to decrement the unfiished tasks counter,
667  // we need a way to figure out if we already did the decrement in case of an error.
668  bool decremented_unfinished_tasks = false;
669  try {
670  run_task_and_fulfill_promise_<T>(
671  task_promise, task_function, decremented_unfinished_tasks
672  );
673  } catch (...) {
674  if( !decremented_unfinished_tasks ) {
675  --unfinished_tasks_;
676  decremented_unfinished_tasks = true;
677  }
678  task_promise->set_exception( std::current_exception() );
679  }
680  assert( decremented_unfinished_tasks );
681  };
682  }
683 
684  template<typename T>
685  typename std::enable_if<!std::is_void<T>::value>::type
686  inline run_task_and_fulfill_promise_(
687  std::shared_ptr<std::promise<T>> task_promise,
688  std::function<T()> task_function,
689  bool& decremented_unfinished_tasks
690  ) {
691  // Run the actual work task here. Once done, we can signal this to the unfinished list.
692  // This bit is the only reason why the whole wrapping exists: We need to first decrement
693  // the unfinished tasks count, before setting the promise value, as otherwise, outside
694  // threads might deduce that there are more pending tasks, when in fact we are already done.
695  auto result = task_function();
696  assert( unfinished_tasks_.load() > 0 );
697  --unfinished_tasks_;
698  decremented_unfinished_tasks = true;
699  task_promise->set_value( std::move( result ));
700  }
701 
702  template<typename T>
703  typename std::enable_if<std::is_void<T>::value>::type
704  inline run_task_and_fulfill_promise_(
705  std::shared_ptr<std::promise<T>> task_promise,
706  std::function<void()> task_function,
707  bool& decremented_unfinished_tasks
708  ) {
709  // Same as above, but for void functions, i.e., without setting a value for the promise.
710  task_function();
711  assert( unfinished_tasks_.load() > 0 );
712  --unfinished_tasks_;
713  decremented_unfinished_tasks = true;
714  task_promise->set_value();
715  }
716 
717  // -------------------------------------------------------------
718  // Internal Members
719  // -------------------------------------------------------------
720 
721 private:
722 
723  // Worker threads
724  std::vector<std::thread> worker_pool_;
725 
726  // WrappedTask queue and its counters
727  BlockingConcurrentQueue<WrappedTask> task_queue_;
728  std::atomic<size_t> unfinished_tasks_{ 0 };
729  size_t max_queue_size_;
730 };
731 
732 // =================================================================================================
733 // Deferred Definitions
734 // =================================================================================================
735 
736 // Implemented here, as it needs ThreadPool to be defined first.
737 template<class T>
739 {
740  // Let's be thorough. The standard encourages the check for validity.
741  throw_if_invalid_();
742 
743  // Also, check that we have a valid thread pool.
744  assert( thread_pool_ );
745  // if( !thread_pool_ ) {
746  // throw std::runtime_error( "Invalid call to ProactiveFuture::wait() without a ThreadPool" );
747  // }
748 
749  // If we have a deferred future, something is off - this was not created by us.
750  // We do not do any busy work while waiting, as otherwise, it won't ever get ready.
751  // As this might deadlock the thread pool, and was not done by us, we throw.
752  assert( !deferred() );
753  // if( deferred() ) {
754  // throw std::runtime_error( "Invalid call to ProactiveFuture::wait() with a deferred future" );
755  // return future_.wait();
756  // }
757 
758  // Otherwise, we use the waiting time to process other tasks from the thread pool
759  // that created this future in the first place.
760  while( !ready() ) {
761  assert( thread_pool_ );
762 
763  // We attempt to run a pending task. If that returns false, there were no tasks
764  // in the pool, so we can yield our thread for now - nothing to do for now, just wait more.
765  // We however need to keep waiting here. It could otherwise be that the task we are waiting
766  // for submits more tasks later, which might then deadlock the thread pool, if we here
767  // went into an actual wait for that first task - which would defy the main purpose of
768  // having this proactive waiting future in the first place.
769  // We also can't do anything with condition variables to omit the busy wait here, as we do
770  // not know whether there will be any tasks in the queue at all before we are done here.
771  if( ! thread_pool_->try_run_pending_task() ) {
772  std::this_thread::yield();
773  }
774  }
775 
776  // We call wait just in case here again, to make sure that everything is all right.
777  // Probably not necessary, as it's already ready, but won't hurt either.
778  // future_.wait();
779 }
780 
781 } // namespace utils
782 } // namespace genesis
783 
784 #endif // include guard
genesis::utils::ThreadPool::size
size_t size() const
Return the number of workers of the thread pool.
Definition: thread_pool.hpp:406
genesis::utils::ThreadPool::~ThreadPool
~ThreadPool()
Destruct the thread pool, waiting for all unfinished tasks.
Definition: thread_pool.hpp:377
genesis::utils::BlockingConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: blocking_concurrent_queue.hpp:352
genesis::utils::ProactiveFuture::wait
void wait() const
Wait for the result to become available.
Definition: thread_pool.hpp:738
genesis::utils::ProactiveFuture::~ProactiveFuture
~ProactiveFuture() noexcept=default
genesis::utils::ProactiveFuture::get
T get()
Return the result, after calling wait().
Definition: thread_pool.hpp:122
blocking_concurrent_queue.hpp
genesis::utils::ProactiveFuture::ready
bool ready() const
Check if the future is ready.
Definition: thread_pool.hpp:215
genesis::utils::ThreadPool::try_run_pending_task
bool try_run_pending_task()
Helper function to run a pending task from outside the pool.
Definition: thread_pool.hpp:536
genesis::utils::ThreadPool
Thread pool for distributed work.
Definition: thread_pool.hpp:333
genesis::utils::ProactiveFuture::deferred
bool deferred() const
Check if the future is deferred, i.e., the result will be computed only when explicitly requested.
Definition: thread_pool.hpp:228
genesis::utils::ThreadPool::pending_tasks_count
size_t pending_tasks_count() const
Return the current number of pending tasks.
Definition: thread_pool.hpp:427
genesis::utils::ThreadPool::enqueue_and_retrieve
auto enqueue_and_retrieve(F &&f, Args &&... args) -> ProactiveFuture< typename std::result_of< F(Args...)>::type >
Enqueue a new task, using a function to call and its arguments, and returning a future to receive the...
Definition: thread_pool.hpp:454
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::ProactiveFuture::get
std::enable_if< std::is_void< U >::value >::type get()
Return the result, after calling wait().
Definition: thread_pool.hpp:149
genesis::utils::ProactiveFuture::get
std::enable_if<!std::is_void< U >::value, U & >::type get()
Return the result, after calling wait().
Definition: thread_pool.hpp:134
genesis::utils::ThreadPool::ThreadPool
ThreadPool(size_t num_threads, size_t max_queue_size=0)
Construct a thread pool with a given number of workers.
Definition: thread_pool.hpp:351
genesis::utils::ProactiveFuture::ProactiveFuture
ProactiveFuture() noexcept=default
Public default constructor, so that for instance a std::vector of ProactiveFuture can be created.
genesis::utils::ProactiveFuture::operator=
ProactiveFuture & operator=(ProactiveFuture &&) noexcept=default
genesis::utils::ThreadPool::enqueue_detached
void enqueue_detached(F &&f, Args &&... args)
Enqueue a new task, using a function to call and its arguments, without a std::future.
Definition: thread_pool.hpp:502
genesis::utils::ProactiveFuture
Wrapper around std::future that implements (pro-)active waiting, i.e., work stealing.
Definition: thread_pool.hpp:79
genesis::utils::ProactiveFuture::valid
bool valid() const noexcept
Check if the future has a shared state.
Definition: thread_pool.hpp:163
genesis::utils::BlockingConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: blocking_concurrent_queue.hpp:182
genesis::utils::ProactiveFuture::wait_until
std::future_status wait_until(std::chrono::time_point< Clock, Duration > const &timeout_time) const
Wait for the result, return if it is not available until specified time point has been reached.
Definition: thread_pool.hpp:202
genesis::utils::ProactiveFuture::wait_for
std::future_status wait_for(std::chrono::duration< Rep, Period > const &timeout_duration) const
Wait for the result, return if it is not available for the specified timeout duration.
Definition: thread_pool.hpp:188
genesis::utils::ThreadPool::operator=
ThreadPool & operator=(ThreadPool const &)=delete
genesis::utils::ThreadPool::wait_for_all_pending_tasks
void wait_for_all_pending_tasks()
Wait for all current tasks to be finished processing.
Definition: thread_pool.hpp:559