A library for working with phylogenetic and population genetic data.
v0.32.0
thread_functions.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_THREAD_FUNCTIONS_H_
2 #define GENESIS_UTILS_THREADING_THREAD_FUNCTIONS_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lczech@carnegiescience.edu>
23  Department of Plant Biology, Carnegie Institution For Science
24  260 Panama Street, Stanford, CA 94305, USA
25 */
26 
27 /*
28  The parallel block/for functionality is inspired by BS::thread_pool,
29  see https://github.com/bshoshany/thread-pool, but adapted to fit into our contex,
30  and improved for a more even distribution of the workload, and usage convenience.
31 
32  We here still need to include the following original license of BS::thread_pool:
33 
34  MIT License
35 
36  Copyright (c) 2022 Barak Shoshany
37 
38  Permission is hereby granted, free of charge, to any person obtaining a copy
39  of this software and associated documentation files (the "Software"), to deal
40  in the Software without restriction, including without limitation the rights
41  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
42  copies of the Software, and to permit persons to whom the Software is
43  furnished to do so, subject to the following conditions:
44 
45  The above copyright notice and this permission notice shall be included in all
46  copies or substantial portions of the Software.
47 
48  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
49  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
50  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
51  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
52  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
53  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  SOFTWARE.
55 */
56 
67 
68 #include <atomic>
69 #include <cassert>
70 #include <condition_variable>
71 #include <functional>
72 #include <future>
73 #include <memory>
74 #include <mutex>
75 #include <stdexcept>
76 #include <thread>
77 #include <utility>
78 #include <vector>
79 
80 namespace genesis {
81 namespace utils {
82 
83 // =================================================================================================
84 // Parallel Loop
85 // =================================================================================================
86 
138 template<
139  typename F,
140  typename T1, typename T2, typename T = typename std::common_type<T1, T2>::type,
141  typename R = typename std::result_of<typename std::decay<F>::type(T, T)>::type
142 >
144  T1 begin, T2 end, F&& body,
145  std::shared_ptr<ThreadPool> thread_pool = nullptr,
146  size_t num_blocks = 0,
147  bool auto_wait = true
148 ) {
149  // If no thread pool was provided, we use the global one.
150  if( !thread_pool ) {
151  thread_pool = Options::get().global_thread_pool();
152  }
153 
154  // Get the total range and number of tasks.
155  // Casting, so that we work with a common type... bit of a relic, but sure, why not.
156  auto begin_t = static_cast<T>( begin );
157  auto end_t = static_cast<T>( end );
158  if( begin_t > end_t ) {
159  std::swap( begin_t, end_t );
160  }
161  size_t const total_size = end_t - begin_t;
162 
163  // Edge case. Nothing to do.
164  if( total_size == 0 ) {
165  assert( begin_t == end_t );
166  return MultiFuture<R>();
167  }
168 
169  // Default block size is the number of threads in the pool plus one.
170  // We implement a work stealing mechanism where the future returned from the pool is proactively
171  // processing other tasks while waiting, so that the calling thread of this function here would
172  // be able to do work as well when waiting, so we make a block for it.
173  if( num_blocks == 0 ) {
174  num_blocks = thread_pool->size() + 1;
175  }
176 
177  // If there are more blocks than actual items to process, we can adjust, so that we actually
178  // have one block per item. Empty blocks would not make sense.
179  if( num_blocks > total_size ) {
180  num_blocks = total_size;
181  }
182  assert( num_blocks > 0 );
183  assert( num_blocks <= total_size );
184 
185  // Compute the needed sizes. We do _not_ follow BS::thread_pool here, as they currently
186  // do not distribute work optimally, see https://github.com/bshoshany/thread-pool/issues/96
187  // Instead, we use blocks of minimal size, and add the remainder to the first few blocks,
188  // so that the blocks that are one larger than the others run first, minimizing our wait
189  // time at the end. See e.g., https://stackoverflow.com/a/36689107
190  size_t const block_size = total_size / num_blocks;
191  size_t const remainder = total_size % num_blocks;
192  assert( block_size > 0 );
193  assert( remainder < num_blocks );
194 
195  // Enqueue all blocks.
196  size_t current_start = 0;
197  MultiFuture<R> result( num_blocks );
198  for( size_t i = 0; i < num_blocks; ++i ) {
199  // We get the length of the current block, and in the beginning also add one to their
200  // length to distribute the remainder elements that did not fit evently into the blocks.
201  // Use that length to get the begin and end points, and submit the block.
202  auto const l = block_size + ( i < remainder ? 1 : 0 );
203  auto const b = begin_t + static_cast<T>( current_start );
204  auto const e = begin_t + static_cast<T>( current_start + l );
205  assert( l > 0 );
206  assert( b < e );
207  result[i] = thread_pool->enqueue_and_retrieve( std::forward<F>( body ), b, e );
208 
209  // Our next block will start where this one ended.
210  current_start += l;
211  assert( current_start <= total_size );
212  }
213 
214  // Now we should have processed everything exactly.
215  // Check this, then return the future.
216  assert( current_start == total_size );
217  assert( begin_t + static_cast<T>( current_start ) == end_t );
218 
219  // If requested, we block here until everything is ready.
220  // This is a proactive waiting, meaning the calling thread will process tasks from the
221  // thread pool while waiting here.
222  if( auto_wait ) {
223  result.wait();
224  }
225  return result;
226 }
227 
255 template<
256  typename F,
257  typename T1, typename T2, typename T = typename std::common_type<T1, T2>::type
258 >
260  T1 begin, T2 end, F&& body,
261  std::shared_ptr<ThreadPool> thread_pool = nullptr,
262  size_t num_blocks = 0,
263  bool auto_wait = true
264 ) {
265  return parallel_block(
266  begin, end,
267  [body]( T b, T e ){
268  for( T i = b; i < e; ++i ) {
269  body(i);
270  }
271  },
272  thread_pool,
273  num_blocks,
274  auto_wait
275  );
276 }
277 
305 template<typename F, typename T>
307  T const begin, T const end, F&& body,
308  std::shared_ptr<ThreadPool> thread_pool = nullptr,
309  size_t num_blocks = 0,
310  bool auto_wait = true
311 ) {
312  // Boundary checks.
313  auto const total = std::distance( begin, end );
314  if( total < 0 ) {
315  throw std::invalid_argument( "Cannot use parallel_for_each() with a reverse range." );
316  }
317  if( total == 0 ) {
318  return MultiFuture<void>();
319  }
320 
321  // Run the loop over elements in parallel blocks.
322  // For some reason, we need to take `begin` by const copy in the signature above,
323  // and copy it again here for the lambda. Otherwise, we run into some weird iterator
324  // invalidity issues, that might come from the threading or something... it's weird.
325  // The iterator itself is never advanced here, so that should not lead to this error...
326  // Edit: It might be due to something similar to P2644R1 "Fix for Range-based for Loop".
327  // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2644r1.pdf
328  // Not sure that makes sense, as we are using a function here, but it seems somehow related.
329  return parallel_block(
330  0, total,
331  [begin, body]( size_t b, size_t e ){
332  for( size_t i = b; i < e; ++i ) {
333  body( *(begin + i) );
334  }
335  },
336  thread_pool,
337  num_blocks,
338  auto_wait
339  );
340 }
341 
368 template<typename F, typename T>
370  T& container, F&& body,
371  std::shared_ptr<ThreadPool> thread_pool = nullptr,
372  size_t num_blocks = 0,
373  bool auto_wait = true
374 ) {
375  // Boundary checks.
376  if( container.size() == 0 ) {
377  return MultiFuture<void>();
378  }
379 
380  // Run the loop over elements in parallel blocks.
381  return parallel_block(
382  0, container.size(),
383  [&container, body]( size_t b, size_t e ) {
384  for( size_t i = b; i < e; ++i ) {
385  body( container[ i ] );
386  }
387  },
388  thread_pool,
389  num_blocks,
390  auto_wait
391  );
392 }
393 
394 // =================================================================================================
395 // Critical Section
396 // =================================================================================================
397 
438 template<typename Tag>
440 {
441 public:
442 
444  {
445  get_static_mutex_().lock();
446  }
447 
449  {
450  get_static_mutex_().unlock();
451  }
452 
453 private:
454 
455  static std::mutex& get_static_mutex_()
456  {
457  static std::mutex mutex_;
458  return mutex_;
459  }
460 };
461 
462 // Macro to define a tag and create a ThreadCriticalSection instance
463 #define GENESIS_THREAD_CRITICAL_SECTION(TagName) \
464  struct TagName {}; \
465  ThreadCriticalSection<TagName> genesis_critical_section_##TagName;
466 
467 } // namespace utils
468 } // namespace genesis
469 
470 #endif // include guard
genesis::placement::swap
void swap(Sample &lhs, Sample &rhs)
Definition: sample.cpp:104
multi_future.hpp
genesis::utils::parallel_block
MultiFuture< R > parallel_block(T1 begin, T2 end, F &&body, std::shared_ptr< ThreadPool > thread_pool=nullptr, size_t num_blocks=0, bool auto_wait=true)
Parallel block over a range of elements, breaking the range into blocks for which the body function i...
Definition: thread_functions.hpp:143
genesis::utils::MultiFuture
Helper class to facilitate waiting for and/or getting the results of multiple futures at once.
Definition: multi_future.hpp:83
genesis::utils::Options::global_thread_pool
std::shared_ptr< ThreadPool > global_thread_pool() const
Return a global thread pool to be used for parallel computations.
Definition: options.hpp:268
genesis::utils::MultiFuture::wait
void wait() const
Wait for all the stored futures.
Definition: multi_future.hpp:179
genesis::utils::parallel_for
MultiFuture< void > parallel_for(T1 begin, T2 end, F &&body, std::shared_ptr< ThreadPool > thread_pool=nullptr, size_t num_blocks=0, bool auto_wait=true)
Parallel for over a range of positions, breaking the range into blocks for which the body function is...
Definition: thread_functions.hpp:259
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
options.hpp
genesis::utils::ThreadCriticalSection::~ThreadCriticalSection
~ThreadCriticalSection()
Definition: thread_functions.hpp:448
genesis::utils::Options::get
static Options & get()
Returns a single instance of this class.
Definition: options.hpp:68
genesis::utils::ThreadCriticalSection
Helper class to define a critical section.
Definition: thread_functions.hpp:439
genesis::utils::parallel_for_each
MultiFuture< void > parallel_for_each(T const begin, T const end, F &&body, std::shared_ptr< ThreadPool > thread_pool=nullptr, size_t num_blocks=0, bool auto_wait=true)
Parallel for each over a container, processing it in blocks for which the body function is executed i...
Definition: thread_functions.hpp:306
thread_pool.hpp
genesis::utils::ThreadCriticalSection::ThreadCriticalSection
ThreadCriticalSection()
Definition: thread_functions.hpp:443