A library for working with phylogenetic and population genetic data.
v0.27.0
thread_pool.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_CORE_THREAD_POOL_H_
2 #define GENESIS_UTILS_CORE_THREAD_POOL_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2020 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
34 #include <condition_variable>
35 #include <functional>
36 #include <future>
37 #include <memory>
38 #include <mutex>
39 #include <queue>
40 #include <stdexcept>
41 #include <thread>
42 #include <vector>
43 
44 namespace genesis {
45 namespace utils {
46 
47 // =================================================================================================
48 // Thread Pool
49 // =================================================================================================
50 
78 {
79 public:
80 
81  // -------------------------------------------------------------
82  // Constructors and Rule of Five
83  // -------------------------------------------------------------
84 
91  explicit ThreadPool( size_t num_threads = 0 )
92  {
93  // Create the desired number of workers
94  for( size_t i = 0; i < num_threads; ++i ) {
95  worker_pool_.emplace_back( [this]{
96 
97  // Each worker runs an infinite loop until requested to stop,
98  // using conditional waits to not waste compute power.
99  while( true ) {
100  std::function<void()> task;
101 
102  // Synchronized access to the task list: see if there is a task to be done,
103  // and if so, pick it up and remove it from the queue
104  {
105  std::unique_lock<std::mutex> lock( task_queue_mutex_ );
106  this->condition_.wait(
107  lock,
108  [this]{
109  return !this->task_queue_.empty() || this->terminate_pool_;
110  }
111  );
112  if( this->terminate_pool_ && this->task_queue_.empty() ) {
113  return;
114  }
115  task = std::move( this->task_queue_.front() );
116  this->task_queue_.pop();
117  }
118 
119  // Run the task
120  task();
121  }
122  });
123  }
124  }
125 
126  ThreadPool( ThreadPool const& ) = delete;
127  ThreadPool( ThreadPool&& ) = delete;
128 
129  ThreadPool& operator= ( ThreadPool const& ) = delete;
130  ThreadPool& operator= ( ThreadPool&& ) = delete;
131 
137  {
138  // Set synchronized signal to all workers to terminate
139  {
140  std::unique_lock<std::mutex> lock( task_queue_mutex_ );
141  terminate_pool_ = true;
142  }
143 
144  // Wake up all workers, and join them back into the main thread
145  condition_.notify_all();
146  for( std::thread& worker : worker_pool_ ) {
147  if( worker.joinable() ) {
148  worker.join();
149  }
150  }
151  }
152 
153  // -------------------------------------------------------------
154  // Members
155  // -------------------------------------------------------------
156 
160  size_t size() const
161  {
162  return worker_pool_.size();
163  }
164 
168  size_t load() const
169  {
170  std::unique_lock<std::mutex> lock( task_queue_mutex_ );
171  return task_queue_.size();
172  }
173 
181  template<class F, class... Args>
182  auto enqueue( F&& f, Args&&... args )
183  -> std::future<typename std::result_of<F(Args...)>::type>
184  {
185  using result_type = typename std::result_of<F(Args...)>::type;
186 
187  // Some dead end checking
188  if( worker_pool_.empty() ) {
189  throw std::runtime_error( "Cannot enqueue task into empty ThreadPool." );
190  }
191 
192  // Prepare the task by binding the function to its arguments
193  auto task = std::make_shared< std::packaged_task<result_type()> >(
194  std::bind( std::forward<F>(f), std::forward<Args>(args)... )
195  );
196 
197  // Prepare the resulting future result of the task
198  std::future<result_type> future_result = task->get_future();
199 
200  // Put the task into the queue, synchronized
201  {
202  std::unique_lock<std::mutex> lock( task_queue_mutex_ );
203 
204  // Do not allow adding tasks after stopping the pool
205  if( terminate_pool_ ) {
206  throw std::runtime_error( "Cannot enqueue task into terminated ThreadPool." );
207  }
208  task_queue_.emplace([task](){ (*task)(); });
209  }
210 
211  // Get a worker to pick up the task, and return the future result
212  condition_.notify_one();
213  return future_result;
214  }
215 
216  // -------------------------------------------------------------
217  // Internal Members
218  // -------------------------------------------------------------
219 
220 private:
221 
222  // Store all workers
223  std::vector<std::thread> worker_pool_;
224 
225  // Store the task queue
226  std::queue<std::function<void()>> task_queue_;
227 
228  // Synchronization
229  mutable std::mutex task_queue_mutex_;
230  std::condition_variable condition_;
231  bool terminate_pool_ = false;
232 };
233 
234 } // namespace utils
235 } // namespace genesis
236 
237 #endif // include guard
genesis::utils::ThreadPool::size
size_t size() const
Return the number of workers of the thread pool.
Definition: thread_pool.hpp:160
genesis::utils::ThreadPool::~ThreadPool
~ThreadPool()
Destruct the thread pool, stopping and joining any workers that are potentially still running or wait...
Definition: thread_pool.hpp:136
genesis::utils::ThreadPool
Thread pool for distributed work.
Definition: thread_pool.hpp:77
genesis::utils::ThreadPool::enqueue
auto enqueue(F &&f, Args &&... args) -> std::future< typename std::result_of< F(Args...)>::type >
Enqueue a new task, using a function to call and its arguments.
Definition: thread_pool.hpp:182
genesis::utils::ThreadPool::ThreadPool
ThreadPool(size_t num_threads=0)
Construct a thread pool with a given number of workers.
Definition: thread_pool.hpp:91
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::ThreadPool::load
size_t load() const
Return the current number of queued tasks.
Definition: thread_pool.hpp:168
genesis::utils::ThreadPool::operator=
ThreadPool & operator=(ThreadPool const &)=delete