A library for working with phylogenetic and population genetic data.
v0.32.0
sequential_output_buffer.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_SEQUENTIAL_OUTPUT_BUFFER_H_
2 #define GENESIS_UTILS_THREADING_SEQUENTIAL_OUTPUT_BUFFER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
36 
37 #include <array>
38 #include <cassert>
39 #include <climits>
40 #include <cstdint>
41 #include <deque>
42 #include <functional>
43 #include <limits>
44 #include <memory>
45 #include <mutex>
46 #include <stdexcept>
47 #include <string>
48 
49 namespace genesis {
50 namespace utils {
51 
52 // =================================================================================================
53 // Sequential Output Buffer
54 // =================================================================================================
55 
96 template<typename T>
98 {
99  // -------------------------------------------------------------------------
100  // Private Member Types
101  // -------------------------------------------------------------------------
102 
103 private:
104 
105  // We use the slots as indicators which elements in the slots of a block have been set already.
106  // Using 64 slots fixed for now, for efficiency. Might parameterize as template param,
107  // so that the buffer can be made smaller if needed for large elements instead.
108  using BlockSlotBits = uint64_t;
109  static const BlockSlotBits ALL_SLOTS = std::numeric_limits<BlockSlotBits>::max();
110  static const size_t BLOCK_SLOT_SIZE = CHAR_BIT * sizeof( BlockSlotBits );
111  static_assert( CHAR_BIT == 8, "CHAR_BIT != 8" );
112 
113  // For sequentially capturing output, we need to know which elements have already been set.
114  // This could be done with one bool per element, but that seems wasteful, and might be slightly
115  // inefficient due to the interleaved memory alignment of inserting single bools between
116  // elements. So instead, we store an array of elements at once, with a bit vector indicating
117  // which ones have already been set. This also reduces the amount of allocations of the deque,
118  // at the cost of higher upfront memory for whole blocks. This currently allocates blocks
119  // of 64 elements each time. For very large elements, we might want to reduce that. But then
120  // again, 64 is an okay number to buffer anyway in a highly multi-threaded environment.
121  struct Block
122  {
123  BlockSlotBits occupied_slots = 0;
124  std::array<T, BLOCK_SLOT_SIZE> slots;
125  };
126 
127  // -------------------------------------------------------------------------
128  // Constructor and Rule of Five
129  // -------------------------------------------------------------------------
130 
131 public:
132 
142  SequentialOutputBuffer( std::function<void(T&&)> output_function, size_t first_sequence_id = 0 )
143  : head_sequence_id_( first_sequence_id )
144  , output_function_( output_function )
145  {
146  if( !output_function ) {
147  throw std::invalid_argument(
148  "Cannot initialize SequentialOutputBuffer with empty output function"
149  );
150  }
151  }
152 
154  {
155  close();
156  }
157 
160 
163 
164  // -------------------------------------------------------------------------
165  // Element Access
166  // -------------------------------------------------------------------------
167 
173  void emplace( size_t sequence_id, T const& element )
174  {
175  T copy = element;
176  emplace( sequence_id, std::move( copy ));
177  }
178 
184  void emplace( size_t sequence_id, T&& element )
185  {
186  // Lock the mutex, as everything from here on needs exclusive access.
187  std::unique_lock<std::mutex> block_lock( block_mutex_ );
188 
189  // Fundamental checks.
190  if( !output_function_ ) {
191  throw std::runtime_error(
192  "Cannot emplace element in SequentialOutputBuffer after it has been closed"
193  );
194  }
195  if( sequence_id < head_sequence_id_ ) {
196  throw std::runtime_error(
197  "Invalid sequence in Sequential Output Buffer, emplacing element " +
198  std::to_string( sequence_id ) + " when head is already at " +
199  std::to_string( head_sequence_id_ )
200  );
201  }
202 
203  // Emplace the element
204  emplace_element_( sequence_id, std::move( element ));
205 
206  // Check if the first blocks are ready for output, i.e., if all their slots are full.
207  // If not, we are done here, at which point the above lock on the mutex gets released.
208  if( block_deque_.empty() || block_deque_.front()->occupied_slots != ALL_SLOTS ) {
209  return;
210  }
211 
212  // Here, we know that at least the first block is ready for output, as all its slots are full.
213  // Transfer all full blocks at the front to the output queue, after which we can release
214  // the block_mutex_ again. At this point, other threads can keep adding to the buffer,
215  // while we process the output, while then holding the output_mutex_. This way,
216  // we avoid having other threads to wait adding to the buffer while processing.
217  transfer_full_blocks_to_output_queue_();
218 
219  // We have transferred all blocks that are ready, and can release the lock on block_mutex_,
220  // so that other threads can continue emplacing elements. Then, we can process the blocks
221  // we have just transferred. This is as close as we can get to lock-free behaviour without
222  // massive over-engineering (there is probably a way to avoid the lock in this function here,
223  // by processing the slots in some atomic way), while still maintaining the correct output
224  // order (which has no way around a lock on the actual output processing, I think).
225  block_lock.unlock();
226  process_output_queue_();
227  }
228 
234  void close()
235  {
236  // Lock both mutexes, just in case this is used in a weird way. We never do this anywhere
237  // else, so there should be no deadlock possible here. Also, this function is not meant
238  // to be called in a concurrent way anyway. If multiple places call this at once,
239  // the second call will wait here, and then fail, as we can only call close() once.
240  std::lock_guard<std::mutex> block_lock( block_mutex_ );
241  std::lock_guard<std::mutex> output_lock( output_mutex_ );
242 
243  // Make sure that we are in a valid state. This function might have been called already,
244  // and then again in the destructor. In that case, do nothing. Otherwise, it's an error.
245  if( !output_function_ ) {
246  if( block_deque_.empty() ) {
247  return;
248  }
249  throw std::runtime_error(
250  "Invalid state of SequentialOutputBuffer after it has been closed"
251  );
252  }
253  assert( output_function_ );
254 
255  // Process the last blocks, and indicate that we are closed already.
256  // No more emplacing possible after this.
257  process_last_blocks_();
258  output_function_ = nullptr;
259  }
260 
261  // -------------------------------------------------------------------------
262  // Private Members
263  // -------------------------------------------------------------------------
264 
265 private:
266 
267  void emplace_element_( size_t sequence_id, T&& element )
268  {
269  // Assumes that caller holds a lock on block_mutex_,
270  // and assumes the following two properties.
271  assert( output_function_ );
272  assert( sequence_id >= head_sequence_id_ );
273 
274  // Get the indices into the block chain that we need.
275  auto const index = sequence_id - head_sequence_id_;
276  auto const block_index = index / BLOCK_SLOT_SIZE;
277  auto const slot_index = index % BLOCK_SLOT_SIZE;
278  auto const slot_bit = static_cast<BlockSlotBits>( 1 ) << slot_index;
279  assert( slot_bit != 0 );
280 
281  // Get the block we need, creating it if not present.
282  while( block_deque_.size() < block_index + 1 ) {
283  block_deque_.emplace_back(
284  genesis::utils::make_unique<Block>()
285  );
286  }
287  assert( block_deque_.size() > block_index );
288  auto& block = *block_deque_[block_index];
289 
290  // Check that the element has not been set already.
291  if( block.occupied_slots & slot_bit ) {
292  throw std::runtime_error(
293  "Invalid sequence in Sequential Output Buffer, emplacing element " +
294  std::to_string( sequence_id ) + ", which has already been emplaced before"
295  );
296  }
297 
298  // Insert the element, and set its slot bit to mark that this is set now.
299  block.slots[slot_index] = std::move( element );
300  block.occupied_slots |= slot_bit;
301  }
302 
303  void transfer_full_blocks_to_output_queue_()
304  {
305  // Assumes that caller holds a lock on block_mutex_,
306  // and assumes the following property, meaning there is a full block at the front.
307  assert( block_deque_.size() > 0 && block_deque_.front()->occupied_slots == ALL_SLOTS );
308 
309  // Transfer all full blocks from the front to the output queue.
310  // While this is being transferred, there might be another thread that is currently
311  // in process_output_queue_(), but that's okay, because we are using a thread safe queue
312  // for the transfer. In that case, the other thread will also process the elements
313  // pushed from here. Then, once our thread here gets to the process_output_queue_() as well,
314  // there might be nothing to do any more, and that's okay.
315  while( block_deque_.size() > 0 && block_deque_.front()->occupied_slots == ALL_SLOTS ) {
316  output_queue_.push( std::move( block_deque_.front() ));
317  block_deque_.pop_front();
318  head_sequence_id_ += BLOCK_SLOT_SIZE;
319  }
320  }
321 
322  void process_output_queue_()
323  {
324  // We obtain a lock on the output_mutex_, to make sure that everything is in the correct
325  // order. If any other thread is currently writing output, we will have to wait here.
326  std::lock_guard<std::mutex> output_lock( output_mutex_ );
327  assert( output_function_ );
328  while( !output_queue_.empty() ) {
329 
330  // Get the front element of the queue. This always succeeds, as we have a lock
331  // on this function and it is the only place where elements are removed.
332  // If any other thread adds elements in the meantime, that's okay as well.
333  std::unique_ptr<Block> block;
334  if( ! output_queue_.try_pop( block )) {
335  throw std::runtime_error( "Internal error: broken lock in SequentialOutputBuffer" );
336  }
337  if( block->occupied_slots != ALL_SLOTS ) {
338  throw std::runtime_error( "Internal error: broken block in SequentialOutputBuffer" );
339  }
340 
341  // Now process the block.
342  for( size_t slot = 0; slot < BLOCK_SLOT_SIZE; ++slot ) {
343  output_function_( std::move( block->slots[slot] ));
344  }
345  }
346  }
347 
348  void process_last_blocks_()
349  {
350  // Here, we know we need to wrap up. We process all elements, checking each time that
351  // the sequence is complete, and remove them until nothing is left.
352  // We cannot have more than the last unfinished block here, as all previous ones
353  // are already processed if the order is correct. Still, we process this in a loop,
354  // so that any errors can be elegantly handled and reported.
355  size_t closed_blocks = 0;
356  while( ! block_deque_.empty() ) {
357  auto& first_block = *block_deque_.front();
358 
359  // Process the currently first block in the deque, stopping once we find an unused slot.
360  size_t slot = 0;
361  for( ; slot < BLOCK_SLOT_SIZE; ++slot ) {
362  auto const slot_bit = static_cast<BlockSlotBits>( 1 ) << slot;
363  assert( slot_bit != 0 );
364 
365  // If the slot bit is set, we can process the element.
366  // If not, we are done - in that case, for the buffer to be valid, there must be
367  // no further blocks or slots after this one, which we check below.
368  if( !( first_block.occupied_slots & slot_bit )) {
369  break;
370  }
371 
372  // Process the element, and unset its slot bit.
373  output_function_( std::move( first_block.slots[slot] ));
374  assert( first_block.occupied_slots & slot_bit );
375  first_block.occupied_slots ^= slot_bit;
376  assert( !( first_block.occupied_slots & slot_bit ));
377  }
378 
379  // We processed the first block. It now needs to be empty for the buffer to be valid.
380  if( first_block.occupied_slots || block_deque_.size() > 1 ) {
381  throw std::runtime_error(
382  "Invalid sequence in Sequential Output Buffer, closing the buffer with gaps "
383  "at sequence id " + std::to_string( head_sequence_id_ + slot )
384  );
385  }
386 
387  // We can now remove the block.
388  assert( slot == BLOCK_SLOT_SIZE || first_block.occupied_slots == 0 );
389  block_deque_.pop_front();
390  head_sequence_id_ += BLOCK_SLOT_SIZE;
391  ++closed_blocks;
392  }
393 
394  // Here, we have finished the above successfully, without throwing.
395  // So indeed, as stated above, this must have been a single block.
396  (void) closed_blocks;
397  assert( closed_blocks <= 1 );
398  }
399 
400  // -------------------------------------------------------------------------
401  // Data Members
402  // -------------------------------------------------------------------------
403 
404 private:
405 
406  // This is meant for multithreaded access, so we need to lock access to the main block deque,
407  // as well as the output processing deque that we keep internally.
408  std::mutex block_mutex_;
409  std::mutex output_mutex_;
410 
411  // A sequence of blocks, each containing slots for the elements.
412  // We store them via pointers, so that we can move them around quickly.
413  // We use a deque here, as we need index-based access to the elements.
414  std::deque<std::unique_ptr<Block>> block_deque_;
415 
416  // Another sequence of blocks, but only those that are ready for output.
417  // We keep this separate, so that output processing in the thread that finishes a block
418  // does not make the other threads wait unnecessarily.
419  // We are using a simple blocking threadsafe queue here, instead of the concurrent queue,
420  // as the latter does not guarantee the order of elements that have been enqueued from
421  // several threads, which would make it harder to reason about the order of blocks here.
422  ThreadsafeQueue<std::unique_ptr<Block>> output_queue_;
423 
424  // We only ever need to keep the id (sequence order) of the first element of the first block
425  // here, as everything else can easily be computed from that one.
426  size_t head_sequence_id_ = 0;
427 
428  // The function to process each element in the correct order with.
429  std::function<void( T&& )> output_function_;
430 };
431 
432 } // namespace utils
433 } // namespace genesis
434 
435 #endif // include guard
genesis::utils::SequentialOutputBuffer::SequentialOutputBuffer
SequentialOutputBuffer(std::function< void(T &&)> output_function, size_t first_sequence_id=0)
Initialize a sequential output buffer with the function that is to be called for each element in the ...
Definition: sequential_output_buffer.hpp:142
genesis::utils::SequentialOutputBuffer::~SequentialOutputBuffer
~SequentialOutputBuffer()
Definition: sequential_output_buffer.hpp:153
genesis::utils::ThreadsafeQueue::push
void push(T new_value)
Definition: threadsafe_queue.hpp:147
std.hpp
Provides some valuable additions to STD.
genesis::utils::SequentialOutputBuffer::emplace
void emplace(size_t sequence_id, T &&element)
Emplace an element in the buffer, at the given sequence_id.
Definition: sequential_output_buffer.hpp:184
genesis::utils::SequentialOutputBuffer::operator=
SequentialOutputBuffer & operator=(SequentialOutputBuffer const &)=delete
genesis::population::to_string
std::string to_string(GenomeLocus const &locus)
Definition: function/genome_locus.hpp:52
genesis::utils::ThreadsafeQueue::try_pop
bool try_pop(T &value)
Definition: threadsafe_queue.hpp:171
threadsafe_queue.hpp
genesis::utils::SequentialOutputBuffer
Buffer structure for output to be produced in a well-defined sequential order.
Definition: sequential_output_buffer.hpp:97
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::SequentialOutputBuffer::emplace
void emplace(size_t sequence_id, T const &element)
Emplace an element in the buffer, at the given sequence_id.
Definition: sequential_output_buffer.hpp:173
genesis::utils::ThreadsafeQueue::empty
bool empty() const
Definition: threadsafe_queue.hpp:193
genesis::utils::SequentialOutputBuffer::close
void close()
Close the buffer, i.e., process all remaining elements.
Definition: sequential_output_buffer.hpp:234