1 #ifndef GENESIS_UTILS_THREADING_THREAD_POOL_H_
2 #define GENESIS_UTILS_THREADING_THREAD_POOL_H_
46 #include <type_traits>
99 : future_( std::move( future_result ))
100 , thread_pool_( &thread_pool )
127 return future_.get();
133 template<
typename U = T>
134 typename std::enable_if<!std::is_void<U>::value, U&>::type
get()
137 static_assert( ! std::is_same<T, void>::value,
"ProactiveFuture::get() intended for T != void" );
142 return future_.get();
148 template<
typename U = T>
149 typename std::enable_if<std::is_void<U>::value>::type
get()
152 static_assert( std::is_same<T, void>::value,
"ProactiveFuture::get() intended for T == void" );
157 return future_.get();
165 return future_.valid();
187 template<
class Rep,
class Period >
188 std::future_status
wait_for( std::chrono::duration<Rep,Period>
const& timeout_duration )
const
191 return future_.wait_for( timeout_duration );
201 template<
class Clock,
class Duration >
202 std::future_status
wait_until( std::chrono::time_point<Clock,Duration>
const& timeout_time )
const
205 return future_.wait_until( timeout_time );
218 return future_.wait_for( std::chrono::seconds(0) ) == std::future_status::ready;
231 return future_.wait_for( std::chrono::seconds(0) ) == std::future_status::deferred;
240 void throw_if_invalid_()
const
245 if( !future_.valid() ) {
246 throw std::future_error( std::future_errc::no_state );
256 std::future<T> future_;
257 ThreadPool* thread_pool_;
351 explicit ThreadPool(
size_t num_threads,
size_t max_queue_size = 0 )
352 : max_queue_size_( max_queue_size )
356 if( max_queue_size_ > 0 && max_queue_size_ < num_threads * 2 ) {
357 throw std::runtime_error(
358 "Cannot use ThreadPool with max queue size less than "
359 "half the number of threads, for efficiency"
364 init_( num_threads );
365 assert( worker_pool_.size() == num_threads );
382 assert( unfinished_tasks_.load() == 0 );
386 for(
size_t i = 0; i < worker_pool_.size(); ++i ) {
387 task_queue_.
enqueue( WrappedTask(
true ));
391 for( std::thread& worker : worker_pool_ ) {
392 if( worker.joinable() ) {
396 assert( unfinished_tasks_.load() == 0 );
408 return worker_pool_.size();
429 return unfinished_tasks_.load();
453 template<
class F,
class... Args>
457 using result_type =
typename std::result_of<F(Args...)>::type;
460 run_tasks_until_below_max_queue_size_();
466 auto task_promise = std::make_shared<std::promise<result_type>>();
471 std::function<result_type()> task_function = std::bind(
472 std::forward<F>(f), std::forward<Args>(args)...
478 WrappedTask wrapped_task;
479 wrapped_task.function = make_wrapped_task_with_promise_( task_promise, task_function );
484 task_queue_.
enqueue( std::move( wrapped_task ));
487 return future_result;
501 template<
class F,
class... Args>
505 run_tasks_until_below_max_queue_size_();
510 WrappedTask wrapped_task;
511 auto task_function = std::bind( std::forward<F>(f), std::forward<Args>(args)... );
512 wrapped_task.function = [task_function,
this]()
516 assert( this->unfinished_tasks_.load() > 0 );
517 --this->unfinished_tasks_;
526 task_queue_.
enqueue( std::move( wrapped_task ));
563 while( unfinished_tasks_.load() > 0 ) {
565 std::this_thread::yield();
567 assert( unfinished_tasks_.load() == 0 );
587 explicit WrappedTask(
bool stop =
false )
591 ~WrappedTask() =
default;
593 WrappedTask( WrappedTask
const& ) =
delete;
594 WrappedTask( WrappedTask&& ) =
default;
596 WrappedTask&
operator= ( WrappedTask
const& ) =
delete;
597 WrappedTask&
operator= ( WrappedTask&& ) =
default;
599 std::function<void()>
function;
607 void init_(
size_t num_threads )
610 worker_pool_.reserve( num_threads );
611 for(
size_t i = 0; i < num_threads; ++i ) {
612 worker_pool_.emplace_back(
623 ConsumerToken consumer_token( pool->task_queue_ );
629 pool->task_queue_.wait_dequeue( consumer_token, task );
637 inline void run_tasks_until_below_max_queue_size_()
653 inline std::function<void()> make_wrapped_task_with_promise_(
654 std::shared_ptr<std::promise<T>> task_promise,
655 std::function<T()> task_function
660 return [
this, task_promise, task_function]()
668 bool decremented_unfinished_tasks =
false;
670 run_task_and_fulfill_promise_<T>(
671 task_promise, task_function, decremented_unfinished_tasks
674 if( !decremented_unfinished_tasks ) {
676 decremented_unfinished_tasks =
true;
678 task_promise->set_exception( std::current_exception() );
680 assert( decremented_unfinished_tasks );
685 typename std::enable_if<!std::is_void<T>::value>::type
686 inline run_task_and_fulfill_promise_(
687 std::shared_ptr<std::promise<T>> task_promise,
688 std::function<T()> task_function,
689 bool& decremented_unfinished_tasks
695 auto result = task_function();
696 assert( unfinished_tasks_.load() > 0 );
698 decremented_unfinished_tasks =
true;
699 task_promise->set_value( std::move( result ));
703 typename std::enable_if<std::is_void<T>::value>::type
704 inline run_task_and_fulfill_promise_(
705 std::shared_ptr<std::promise<T>> task_promise,
706 std::function<
void()> task_function,
707 bool& decremented_unfinished_tasks
711 assert( unfinished_tasks_.load() > 0 );
713 decremented_unfinished_tasks =
true;
714 task_promise->set_value();
724 std::vector<std::thread> worker_pool_;
727 BlockingConcurrentQueue<WrappedTask> task_queue_;
728 std::atomic<size_t> unfinished_tasks_{ 0 };
729 size_t max_queue_size_;
744 assert( thread_pool_ );
752 assert( !deferred() );
761 assert( thread_pool_ );
771 if( ! thread_pool_->try_run_pending_task() ) {
772 std::this_thread::yield();
784 #endif // include guard