1 #ifndef GENESIS_UTILS_CORE_THREAD_POOL_H_
2 #define GENESIS_UTILS_CORE_THREAD_POOL_H_
34 #include <condition_variable>
94 for(
size_t i = 0; i < num_threads; ++i ) {
95 worker_pool_.emplace_back( [
this]{
100 std::function<void()> task;
105 std::unique_lock<std::mutex> lock( task_queue_mutex_ );
106 this->condition_.wait(
109 return !this->task_queue_.empty() || this->terminate_pool_;
112 if( this->terminate_pool_ && this->task_queue_.empty() ) {
115 task = std::move( this->task_queue_.front() );
116 this->task_queue_.pop();
140 std::unique_lock<std::mutex> lock( task_queue_mutex_ );
141 terminate_pool_ =
true;
145 condition_.notify_all();
146 for( std::thread& worker : worker_pool_ ) {
147 if( worker.joinable() ) {
162 return worker_pool_.size();
170 std::unique_lock<std::mutex> lock( task_queue_mutex_ );
171 return task_queue_.size();
181 template<
class F,
class... Args>
183 -> std::future<
typename std::result_of<F(Args...)>::type>
185 using result_type =
typename std::result_of<F(Args...)>::type;
188 if( worker_pool_.empty() ) {
189 throw std::runtime_error(
"Cannot enqueue task into empty ThreadPool." );
193 auto task = std::make_shared< std::packaged_task<result_type()> >(
194 std::bind( std::forward<F>(f), std::forward<Args>(args)... )
198 std::future<result_type> future_result = task->get_future();
202 std::unique_lock<std::mutex> lock( task_queue_mutex_ );
205 if( terminate_pool_ ) {
206 throw std::runtime_error(
"Cannot enqueue task into terminated ThreadPool." );
208 task_queue_.emplace([task](){ (*task)(); });
212 condition_.notify_one();
213 return future_result;
223 std::vector<std::thread> worker_pool_;
226 std::queue<std::function<void()>> task_queue_;
229 mutable std::mutex task_queue_mutex_;
230 std::condition_variable condition_;
231 bool terminate_pool_ =
false;
237 #endif // include guard