1 #ifndef GENESIS_UTILS_THREADING_SEQUENTIAL_OUTPUT_BUFFER_H_
2 #define GENESIS_UTILS_THREADING_SEQUENTIAL_OUTPUT_BUFFER_H_
108 using BlockSlotBits = uint64_t;
109 static const BlockSlotBits ALL_SLOTS = std::numeric_limits<BlockSlotBits>::max();
110 static const size_t BLOCK_SLOT_SIZE = CHAR_BIT *
sizeof( BlockSlotBits );
111 static_assert( CHAR_BIT == 8,
"CHAR_BIT != 8" );
123 BlockSlotBits occupied_slots = 0;
124 std::array<T, BLOCK_SLOT_SIZE> slots;
143 : head_sequence_id_( first_sequence_id )
144 , output_function_( output_function )
146 if( !output_function ) {
147 throw std::invalid_argument(
148 "Cannot initialize SequentialOutputBuffer with empty output function"
173 void emplace(
size_t sequence_id, T
const& element )
176 emplace( sequence_id, std::move( copy ));
184 void emplace(
size_t sequence_id, T&& element )
187 std::unique_lock<std::mutex> block_lock( block_mutex_ );
190 if( !output_function_ ) {
191 throw std::runtime_error(
192 "Cannot emplace element in SequentialOutputBuffer after it has been closed"
195 if( sequence_id < head_sequence_id_ ) {
196 throw std::runtime_error(
197 "Invalid sequence in Sequential Output Buffer, emplacing element " +
204 emplace_element_( sequence_id, std::move( element ));
208 if( block_deque_.empty() || block_deque_.front()->occupied_slots != ALL_SLOTS ) {
217 transfer_full_blocks_to_output_queue_();
226 process_output_queue_();
240 std::lock_guard<std::mutex> block_lock( block_mutex_ );
241 std::lock_guard<std::mutex> output_lock( output_mutex_ );
245 if( !output_function_ ) {
246 if( block_deque_.empty() ) {
249 throw std::runtime_error(
250 "Invalid state of SequentialOutputBuffer after it has been closed"
253 assert( output_function_ );
257 process_last_blocks_();
258 output_function_ =
nullptr;
267 void emplace_element_(
size_t sequence_id, T&& element )
271 assert( output_function_ );
272 assert( sequence_id >= head_sequence_id_ );
275 auto const index = sequence_id - head_sequence_id_;
276 auto const block_index = index / BLOCK_SLOT_SIZE;
277 auto const slot_index = index % BLOCK_SLOT_SIZE;
278 auto const slot_bit =
static_cast<BlockSlotBits
>( 1 ) << slot_index;
279 assert( slot_bit != 0 );
282 while( block_deque_.size() < block_index + 1 ) {
283 block_deque_.emplace_back(
284 genesis::utils::make_unique<Block>()
287 assert( block_deque_.size() > block_index );
288 auto& block = *block_deque_[block_index];
291 if( block.occupied_slots & slot_bit ) {
292 throw std::runtime_error(
293 "Invalid sequence in Sequential Output Buffer, emplacing element " +
294 std::to_string( sequence_id ) +
", which has already been emplaced before"
299 block.slots[slot_index] = std::move( element );
300 block.occupied_slots |= slot_bit;
303 void transfer_full_blocks_to_output_queue_()
307 assert( block_deque_.size() > 0 && block_deque_.front()->occupied_slots == ALL_SLOTS );
315 while( block_deque_.size() > 0 && block_deque_.front()->occupied_slots == ALL_SLOTS ) {
316 output_queue_.
push( std::move( block_deque_.front() ));
317 block_deque_.pop_front();
318 head_sequence_id_ += BLOCK_SLOT_SIZE;
322 void process_output_queue_()
326 std::lock_guard<std::mutex> output_lock( output_mutex_ );
327 assert( output_function_ );
328 while( !output_queue_.
empty() ) {
333 std::unique_ptr<Block> block;
334 if( ! output_queue_.
try_pop( block )) {
335 throw std::runtime_error(
"Internal error: broken lock in SequentialOutputBuffer" );
337 if( block->occupied_slots != ALL_SLOTS ) {
338 throw std::runtime_error(
"Internal error: broken block in SequentialOutputBuffer" );
342 for(
size_t slot = 0; slot < BLOCK_SLOT_SIZE; ++slot ) {
343 output_function_( std::move( block->slots[slot] ));
348 void process_last_blocks_()
355 size_t closed_blocks = 0;
356 while( ! block_deque_.empty() ) {
357 auto& first_block = *block_deque_.front();
361 for( ; slot < BLOCK_SLOT_SIZE; ++slot ) {
362 auto const slot_bit =
static_cast<BlockSlotBits
>( 1 ) << slot;
363 assert( slot_bit != 0 );
368 if( !( first_block.occupied_slots & slot_bit )) {
373 output_function_( std::move( first_block.slots[slot] ));
374 assert( first_block.occupied_slots & slot_bit );
375 first_block.occupied_slots ^= slot_bit;
376 assert( !( first_block.occupied_slots & slot_bit ));
380 if( first_block.occupied_slots || block_deque_.size() > 1 ) {
381 throw std::runtime_error(
382 "Invalid sequence in Sequential Output Buffer, closing the buffer with gaps "
388 assert( slot == BLOCK_SLOT_SIZE || first_block.occupied_slots == 0 );
389 block_deque_.pop_front();
390 head_sequence_id_ += BLOCK_SLOT_SIZE;
396 (void) closed_blocks;
397 assert( closed_blocks <= 1 );
408 std::mutex block_mutex_;
409 std::mutex output_mutex_;
414 std::deque<std::unique_ptr<Block>> block_deque_;
422 ThreadsafeQueue<std::unique_ptr<Block>> output_queue_;
426 size_t head_sequence_id_ = 0;
429 std::function<void( T&& )> output_function_;
435 #endif // include guard