A library for working with phylogenetic and population genetic data.
v0.32.0
generic_input_stream.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_CONTAINERS_GENERIC_INPUT_STREAM_H_
2 #define GENESIS_UTILS_CONTAINERS_GENERIC_INPUT_STREAM_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
29 
30 #include <cassert>
31 #include <functional>
32 #include <future>
33 #include <memory>
34 #include <stdexcept>
35 #include <string>
36 #include <utility>
37 #include <vector>
38 
39 namespace genesis {
40 namespace utils {
41 
42 // =================================================================================================
43 // Helpers
44 // =================================================================================================
45 
54 {};
55 
56 // =================================================================================================
57 // Generic Input Stream
58 // =================================================================================================
59 
162 template<class T, class D = EmptyGenericInputStreamData>
164 {
165 public:
166 
167  // -------------------------------------------------------------------------
168  // Member Types
169  // -------------------------------------------------------------------------
170 
172  using value_type = T;
173  using pointer = value_type const*;
174  using reference = value_type const&;
175  using difference_type = std::ptrdiff_t;
176  using iterator_category = std::input_iterator_tag;
177 
178  using Data = D;
179 
187  static size_t const DEFAULT_BLOCK_SIZE = 0;
188 
189  // ======================================================================================
190  // Internal Iterator
191  // ======================================================================================
192 
196  class Iterator
197  {
198  public:
199 
200  // -------------------------------------------------------------------------
201  // Constructors and Rule of Five
202  // -------------------------------------------------------------------------
203 
205  using value_type = T;
206  using pointer = value_type const*;
207  using reference = value_type const&;
208  using difference_type = std::ptrdiff_t;
209  using iterator_category = std::input_iterator_tag;
210 
211  using Data = D;
212 
220  Iterator() = default;
221 
222  private:
223 
230  Iterator(
231  GenericInputStream const* generator
232  )
233  : generator_( generator )
234  , current_block_( std::make_shared<std::vector<T>>() )
235  , buffer_block_( std::make_shared<std::vector<T>>() )
236  , future_( std::make_shared<ProactiveFuture<size_t>>() )
237  {
238  // We use the generator as a check if this Iterator is intended to be a begin()
239  // or end() iterator. If its the former, init and get the first element block.
240  // We could also just use the default constructor to create end() iterators,
241  // this would have the same effect.
242  // After we are done iterating the input, we then set the generator_ to nullptr,
243  // as a sign that we are done. This allows us also to know if we reached end() without
244  // having to store the end() iterator when using this class.
245  if( generator_ ) {
246  if( ! generator_->get_element_ ) {
247  throw std::invalid_argument(
248  "Cannot use GenericInputStream without a function to get elements."
249  );
250  }
251 
252  // Initialize the current_block_ and buffer_block_,
253  // and read the first block(s) of the file.
254  begin_iteration_();
255  }
256  }
257 
258  public:
259 
260  ~Iterator() = default;
261 
262  Iterator( self_type const& ) = default;
263  Iterator( self_type&& ) = default;
264 
265  Iterator& operator= ( self_type const& ) = default;
266  Iterator& operator= ( self_type&& ) = default;
267 
269 
270  // -------------------------------------------------------------------------
271  // Accessors
272  // -------------------------------------------------------------------------
273 
274  value_type const * operator->() const
275  {
276  assert( current_block_ );
277  assert( current_pos_ < end_pos_ );
278  assert( current_pos_ < current_block_->size() );
279  return &((*current_block_)[current_pos_]);
280  }
281 
283  {
284  assert( current_block_ );
285  assert( current_pos_ < end_pos_ );
286  assert( current_pos_ < current_block_->size() );
287  return &((*current_block_)[current_pos_]);
288  }
289 
290  value_type const & operator*() const
291  {
292  assert( current_block_ );
293  assert( current_pos_ < end_pos_ );
294  assert( current_pos_ < current_block_->size() );
295  return (*current_block_)[current_pos_];
296  }
297 
299  {
300  assert( current_block_ );
301  assert( current_pos_ < end_pos_ );
302  assert( current_pos_ < current_block_->size() );
303  return (*current_block_)[current_pos_];
304  }
305 
309  Data const& data() const
310  {
311  if( ! generator_ ) {
312  throw std::runtime_error(
313  "Cannot access default constructed or past-the-end GenericInputStream content."
314  );
315  }
316  assert( generator_ );
317  return generator_->data_;
318  }
319 
320  // We do not offer non-const access at the moment. If we did,
321  // the generator pointer stored in this iterator would need to be non-const as well.
322 
323  // /* *
324  // * @brief Access the data stored in the iterator.
325  // */
326  // Data& data()
327  // {
328  // if( ! generator_ ) {
329  // throw std::runtime_error(
330  // "Cannot access default constructed or past-the-end GenericInputStream content."
331  // );
332  // }
333  // assert( generator_ );
334  // return generator_->data_;
335  // }
336 
337  // -------------------------------------------------------------------------
338  // Iteration
339  // -------------------------------------------------------------------------
340 
342  {
343  increment_();
344  return *this;
345  }
346 
347  // self_type operator ++(int)
348  // {
349  // auto cpy = *this;
350  // increment_();
351  // return cpy;
352  // }
353 
358  operator bool() const
359  {
360  // We here rely on the fact that we set the generator_ to nullptr if this is either
361  // the end() iterator or if the iteration has reached its end after the data is done.
362  return generator_ != nullptr;
363  }
364 
374  bool operator==( self_type const& other ) const
375  {
376  // We compare the generators as a baseline - two past-the-end iterator shall
377  // always compare equal. If only one of them is past-the-end, they will compare false.
378  // Only if both are valid (not past-the-end) iterators, we compare their current
379  // position in the block - two iterators to the same position in the same block
380  // (using pointer comparison for the block - not actual data comparison) are equal.
381  if( ! generator_ || ! other.generator_ ) {
382  // generator_ is used as the indicator whether this is a past-the-end iterator
383  // (in which case it is a nullptr), or not.
384  return generator_ == other.generator_;
385  }
386  assert( generator_ && other.generator_ );
387  return current_block_ == other.current_block_ && current_pos_ == other.current_pos_;
388  }
389 
390  bool operator!=( self_type const& other ) const
391  {
392  return !(*this == other);
393  }
394 
395  // -------------------------------------------------------------------------
396  // Internal Members
397  // -------------------------------------------------------------------------
398 
399  private:
400 
401  void begin_iteration_()
402  {
403  // Check that they are set up from the constructor.
404  assert( generator_ );
405  assert( current_block_ );
406  assert( buffer_block_ );
407 
408  // Before starting to init anything, call the callbacks.
409  execute_begin_callbacks_();
410 
411  // Run the correct init function depending on buffering.
412  if( generator_->block_size_ == 0 ) {
413  begin_iteration_without_buffer_();
414  } else {
415  begin_iteration_with_buffer_();
416  }
417  }
418 
419  void begin_iteration_without_buffer_()
420  {
421  // Edge case: no buffering.
422  // Block size zero indicates to use no buffering, so we just use a single element.
423  // Make space for it, read the first element, and then we are done here.
424  assert( generator_ );
425  assert( generator_->block_size_ == 0 );
426  assert( current_pos_ == 0 );
427  current_block_->resize( 1 );
428  end_pos_ = 1;
429  increment_without_buffer_();
430  }
431 
432  void begin_iteration_with_buffer_()
433  {
434  // The main class makes sure that we have a thread pool when we want to use the buffer
435  assert( generator_->thread_pool_ );
436 
437  // Init the records and create empty VcfRecord to read into.
438  // The blocks have been initialized in the contructor already; assert this.
439  current_block_->resize( generator_->block_size_ );
440  buffer_block_->resize( generator_->block_size_ );
441  assert( current_block_->size() == generator_->block_size_ );
442  assert( buffer_block_->size() == generator_->block_size_ );
443 
444  // Read the first block synchronously,
445  // so that there is data initialized to be dereferenced.
446  end_pos_ = read_block_( generator_, current_block_, generator_->block_size_ );
447  assert( current_pos_ == 0 );
448 
449  // Check how much data we got, and whether we want to start the background worker.
450  // If there is less data than the block size, the file is already done.
451  // However, if the first block was fully read, we start the async worker thread
452  // to fill the buffer with the next block of data.
453  if( end_pos_ == generator_->block_size_ ) {
454  enqueue_filling_of_buffer_block_();
455  } else if( end_pos_ == 0 ) {
456  // Edge case: zero elements read. We are already done then.
457  end_iteration_();
458  return;
459  }
460  assert( end_pos_ > 0 );
461  assert( end_pos_ <= generator_->block_size_ );
462 
463  // Now we have an element, which will be the first one of the iteration,
464  // and so we execute the observers for it.
465  assert( current_pos_ == 0 );
466  execute_on_enter_observers_( (*current_block_)[current_pos_] );
467  }
468 
469  void increment_()
470  {
471  // Make sure that all things are still in place.
472  assert( generator_ );
473  assert( current_block_ && current_block_->size() > 0 );
474 
475  // Make sure that sizes have not been changed in the parent class.
476  assert(
477  current_block_->size() == generator_->block_size_ ||
478  (
479  generator_->block_size_ == 0 &&
480  current_block_->size() == 1
481  )
482  );
483  assert( buffer_block_->size() == generator_->block_size_ );
484 
485  // We are moving to the next element. This is the only time that we need to call
486  // the leaving observers, as this happens both during the normal iteration,
487  // as well as at its end. We first call this function, and then do the actual increment.
488  // What that incrementation finds that we are at the end of the data, we have already
489  // called our leaving observer, so all good, no extra case needed for that.
490  assert( current_pos_ < current_block_->size() );
491  execute_on_leave_observers_( (*current_block_)[current_pos_] );
492 
493  // Run the actual increment implementation.
494  if( generator_->block_size_ == 0 ) {
495  increment_without_buffer_();
496  } else {
497  increment_with_buffer_();
498  }
499  }
500 
501  void increment_without_buffer_()
502  {
503  // Edge case: no buffering.
504  // Read the next element. If there is none, we are done.
505  assert( current_pos_ == 0 );
506  assert( current_block_ );
507  assert( current_block_->size() == 1 );
508  if( get_next_element_( generator_, (*current_block_)[0] )) {
509  execute_on_enter_observers_( (*current_block_)[0] );
510  } else {
511  end_iteration_();
512  }
513  }
514 
515  void increment_with_buffer_()
516  {
517  // Finish the reading (potentially waiting if not yet finished in the worker thread).
518  // The future returns how much data there was to be read, which we use as our status.
519  // After that, swap the buffer and start a new reading operation in the worker thread.
520 
521  // Move to the next element in the vector.
522  ++current_pos_;
523  assert( current_pos_ <= end_pos_ );
524 
525  // If we are at the end of the data, we are either done iterating,
526  // or reached the end of the current buffer block.
527  if( current_pos_ == end_pos_ ) {
528 
529  // If we did not get a full block size when reading, we are done iterating.
530  // Indicate this by unsetting the generator_ pointer.
531  if( end_pos_ < generator_->block_size_ ) {
532  end_iteration_();
533  return;
534  }
535 
536  // If we are at the end of the record vector, and if that vector was full
537  // (complete block size), there is more data, so start reading.
538 
539  assert( end_pos_ == generator_->block_size_ );
540  assert( future_ );
541  assert( future_->valid() );
542 
543  // Get how many records were read into the buffer, which also waits for the reading
544  // if necessary. If we did not read any now, that means that the number of total
545  // elements is a multiple of the block size, which we handle as a special case here.
546  // There is probably some better way to restructure the code to avoid this edge
547  // case... but good enough for now.
548  end_pos_ = future_->get();
549  if( end_pos_ == 0 ) {
550  end_iteration_();
551  return;
552  }
553 
554  // Here, we know that there is more data buffered, so we can swap the buffer.
555  // If that last read returned with less than a full block of items, we have reached
556  // the end of the input, and do not want to start reading more (that condition was
557  // missing before - nasty bug that only showed up in consequetive concurrent tests,
558  // so that a thread was started even after there was no more input. That thread
559  // would then access data from the next test case, which of course happened to
560  // occupy the same stack space. This never showed up in read data, as that
561  // stray thread would not communicate with anything, and somehow did not throw
562  // or segfault either. Super nasty to find.
563  // Anyway, if we had read a full block before, we need to start reading again.
564  // Finally, set or internal current location to the first element of the vector again.
565  assert( current_block_ );
566  assert( buffer_block_ );
567  assert( end_pos_ > 0 && end_pos_ <= generator_->block_size_ );
568  std::swap( buffer_block_, current_block_ );
569  if( end_pos_ == generator_->block_size_ ) {
570  assert( !future_->valid() );
571  enqueue_filling_of_buffer_block_();
572  assert( future_->valid() );
573  }
574  current_pos_ = 0;
575  }
576 
577  // Now we have moved to the next element, and potentially the next block,
578  // so we are ready to call the observers for that element.
579  assert( current_pos_ < end_pos_ );
580  execute_on_enter_observers_( (*current_block_)[current_pos_] );
581  }
582 
583  void enqueue_filling_of_buffer_block_()
584  {
585  // Those shared pointers have been initialized in the constructor; let's assert this.
586  assert( generator_ );
587  assert( generator_->thread_pool_ );
588  assert( future_ );
589 
590  // This function is only every called after we finished any previous operations,
591  // so let's assert that the thread pool and future are in the states that we expect.
592  // assert( generator_->thread_pool_->load() == 0 );
593  assert( ! future_->valid() );
594 
595  // Make sure that sizes have not been changed in the parent class.
596  assert(
597  current_block_->size() == generator_->block_size_ ||
598  (
599  generator_->block_size_ == 0 &&
600  current_block_->size() == 1
601  )
602  );
603  assert( buffer_block_ );
604  assert( buffer_block_->size() == generator_->block_size_ );
605 
606  // In order to use lambda captures by copy for class member variables in C++11, we first
607  // have to make local copies, and then capture those. Capturing the class members
608  // directly was only introduced later. Bit cumbersome, but gets the job done.
609  auto generator = generator_;
610  auto buffer_block = buffer_block_;
611  auto block_size = generator_->block_size_;
612 
613  // The lambda returns the result of read_block_ call, that is, the number of records that
614  // have been read, and which we later (in the future_) use to see how much data we got.
615  // It is of course important that the input is read in the correct order of elements.
616  // Despite its parallel nature, we can use a thread pool here, as we are only ever
617  // submitting a single read task to it, so there cannot be two reads of the same lambda
618  // iterator in the pool.
619  *future_ = generator_->thread_pool_->enqueue_and_retrieve(
620  [ generator, buffer_block, block_size ](){
621  return read_block_( generator, buffer_block, block_size );
622  }
623  );
624  }
625 
629  static size_t read_block_(
630  GenericInputStream const* generator,
631  std::shared_ptr<std::vector<T>> buffer_block,
632  size_t block_size
633  ) {
634  // This is a static function that does not depend on the class member data, so that
635  // we can use it from the future lambda in the thread pool above without having to worry
636  // about lambda captures of `this` going extinct... which was an absolutely nasty bug to
637  // find! Such a rookie mistake! For that reason, we also take all arguments as shared
638  // pointers, so that they are kept alive while the thread pool is working.
639  // However, once its done with its work, the function (the one that we give to the thread
640  // pool with a lambda) is popped from the thread queue, so that the shared pointer can
641  // be freed again - that is, we do not need to worry about the lambda keeping the shared
642  // pointer from freeing its memory indefinitely.
643 
644  assert( generator );
645  assert( buffer_block );
646  assert( buffer_block->size() == block_size );
647 
648  // Read as long as there is data. Return number of read records.
649  size_t i = 0;
650  for( ; i < block_size; ++i ) {
651  if( ! get_next_element_( generator, (*buffer_block)[i] )) {
652  break;
653  }
654  }
655  return i;
656  }
657 
665  static bool get_next_element_(
666  GenericInputStream const* generator,
667  T& target
668  ) {
669  assert( generator );
670  bool usable_element = false;
671  while( true ) {
672  // Get the next element from the input source.
673  bool const got_element = generator->get_element_( target );
674 
675  if( got_element ) {
676  // If this is an element (not yet at the end of the data),
677  // apply all transforms and filters, and get out of here if all of them
678  // return `true`, that is, if none of them wants to filter out the element.
679  // If however one of them returns `false`, we need to find another element.
680  usable_element = true;
681  for( auto const& tra_fil : generator->transforms_and_filters_ ) {
682  usable_element = tra_fil( target );
683  if( ! usable_element ) {
684  // If one of the transforms/filters does not want us to continue,
685  // we do not call the others. Break out of the inner loop.
686  break;
687  }
688  }
689 
690  // If we got out of the above loop without any failing filters,
691  // we have fonud an element, so let's move it to our target.
692  if( usable_element ) {
693  // We already checked that got_element holds data, so no need to repeat,
694  // just assert it. Done here, break out of the while loop.
695  assert( got_element );
696  break;
697  }
698  // Else (usable_element == false): Loop again, try the next element.
699 
700  } else {
701  // If this is not a valid element, we reached the end of the input.
702  // We note this in the return value, then break out of the while loop.
703  assert( ! got_element );
704  usable_element = false;
705  break;
706  }
707  }
708  return usable_element;
709  }
710 
711  void end_iteration_()
712  {
713  // This function is called at any point when we reach the end of the input source.
714  // We first need to call all end callbacks, before unsetting the generator pointer.
715  // Doing so is our signal that we have reached the end, and is used for example
716  // in the equality comparison of this iterator to test whether an iterator is valid
717  // or not (or equal to the past-the-end iterator).
718  assert( generator_ );
719  assert( ! future_->valid() );
720  execute_end_callbacks_();
721  generator_ = nullptr;
722  }
723 
724  void execute_on_enter_observers_( T const& element ) const
725  {
726  assert( generator_ );
727  for( auto const& observer : generator_->on_enter_observers_ ) {
728  observer( element );
729  }
730  }
731 
732  void execute_on_leave_observers_( T const& element ) const
733  {
734  assert( generator_ );
735  for( auto const& observer : generator_->on_leave_observers_ ) {
736  observer( element );
737  }
738  }
739 
740  void execute_begin_callbacks_() const
741  {
742  assert( generator_ );
743  for( auto const& cb : generator_->begin_callbacks_ ) {
744  cb( *generator_ );
745  }
746  }
747 
748  void execute_end_callbacks_() const
749  {
750  assert( generator_ );
751  for( auto const& cb : generator_->end_callbacks_ ) {
752  cb( *generator_ );
753  }
754  }
755 
756  private:
757 
758  // Parent.
759  GenericInputStream const* generator_ = nullptr;
760 
761  // Buffer buffering data, and positions in it.
762  std::shared_ptr<std::vector<T>> current_block_;
763  std::shared_ptr<std::vector<T>> buffer_block_;
764  size_t current_pos_ = 0;
765  size_t end_pos_ = 0;
766 
767  // Store the future_ used to keep track of the background task. It returns the number of
768  // elements that have been read into the buffer (block_size_, or less at the end of the file).
769  std::shared_ptr<ProactiveFuture<size_t>> future_;
770 
771  };
772 
773  // ======================================================================================
774  // Main Class
775  // ======================================================================================
776 
777  // -------------------------------------------------------------------------
778  // Constructors and Rule of Five
779  // -------------------------------------------------------------------------
780 
781  GenericInputStream() = default;
782 
791  std::function<bool(value_type&)> get_element,
792  std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
794  )
795  : get_element_(get_element)
796  , block_size_( block_size )
797  {
798  // We only need to set the thread pool if we are going to use buffering.
799  if( block_size > 0 ) {
801  }
802  }
803 
812  std::function<bool(value_type&)> get_element,
813  Data const& data,
814  std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
816  )
817  : GenericInputStream( get_element, thread_pool, block_size )
818  {
819  data_ = data;
820  }
821 
828  std::function<bool(value_type&)> get_element,
829  Data&& data,
830  std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
832  )
833  : GenericInputStream( get_element, thread_pool, block_size )
834  {
835  data_ = std::move( data );
836  }
837 
838  ~GenericInputStream() = default;
839 
840  GenericInputStream( self_type const& ) = default;
841  GenericInputStream( self_type&& ) = default;
842 
843  self_type& operator= ( self_type const& ) = default;
844  self_type& operator= ( self_type&& ) = default;
845 
846  friend Iterator;
847 
848  // -------------------------------------------------------------------------
849  // Iteration
850  // -------------------------------------------------------------------------
851 
853  {
854  if( has_started_ ) {
855  throw std::runtime_error( "GenericInputStream: Cannot call begin() multiple times." );
856  }
857  has_started_ = true;
858  return Iterator( this );
859  }
860 
862  {
863  return Iterator( nullptr );
864  }
865 
870  operator bool() const
871  {
872  return static_cast<bool>( get_element_ );
873  }
874 
878  Data const& data() const
879  {
880  return data_;
881  }
882 
887  {
888  return data_;
889  }
890 
891  // -------------------------------------------------------------------------
892  // Filters and Transformations
893  // -------------------------------------------------------------------------
894 
903  self_type& add_transform( std::function<void(T&)> const& transform )
904  {
905  return add_transform_filter(
906  [transform]( T& element ){
907  transform( element );
908  return true;
909  }
910  );
911  }
912 
920  self_type& add_filter( std::function<bool(T const&)> const& filter )
921  {
922  return add_transform_filter(
923  [filter]( T& element ){
924  return filter( element );
925  }
926  );
927  }
928 
939  self_type& add_transform_filter( std::function<bool(T&)> const& filter )
940  {
941  // Sanity check.
942  if( has_started_ ) {
943  throw std::runtime_error(
944  "GenericInputStream: Cannot change filters/transformations after iteration has started."
945  );
946  }
947  transforms_and_filters_.push_back( filter );
948  return *this;
949  }
950 
955  {
956  if( has_started_ ) {
957  throw std::runtime_error(
958  "GenericInputStream: Cannot change filters/transformations after iteration has started."
959  );
960  }
961  transforms_and_filters_.clear();
962  return *this;
963  }
964 
965  // -------------------------------------------------------------------------
966  // Observers and Callbacks
967  // -------------------------------------------------------------------------
968 
981  self_type& add_on_enter_observer( std::function<void(T const&)> const& observer )
982  {
983  if( has_started_ ) {
984  throw std::runtime_error(
985  "GenericInputStream: Cannot change observers after iteration has started."
986  );
987  }
988  on_enter_observers_.push_back( observer );
989  return *this;
990  }
991 
1005  self_type& add_on_leave_observer( std::function<void(T const&)> const& observer )
1006  {
1007  if( has_started_ ) {
1008  throw std::runtime_error(
1009  "GenericInputStream: Cannot change observers after iteration has started."
1010  );
1011  }
1012  on_leave_observers_.push_back( observer );
1013  return *this;
1014  }
1015 
1022  {
1023  if( has_started_ ) {
1024  throw std::runtime_error(
1025  "GenericInputStream: Cannot change observers after iteration has started."
1026  );
1027  }
1028  on_enter_observers_.clear();
1029  on_leave_observers_.clear();
1030  return *this;
1031  }
1032 
1047  self_type& add_begin_callback( std::function<void(GenericInputStream const&)> const& callback )
1048  {
1049  if( has_started_ ) {
1050  throw std::runtime_error(
1051  "GenericInputStream: Cannot change callbacks after iteration has started."
1052  );
1053  }
1054  begin_callbacks_.push_back( callback );
1055  return *this;
1056  }
1057 
1065  self_type& add_end_callback( std::function<void(GenericInputStream const&)> const& callback )
1066  {
1067  if( has_started_ ) {
1068  throw std::runtime_error(
1069  "GenericInputStream: Cannot change callbacks after iteration has started."
1070  );
1071  }
1072  end_callbacks_.push_back( callback );
1073  return *this;
1074  }
1075 
1081  {
1082  if( has_started_ ) {
1083  throw std::runtime_error(
1084  "GenericInputStream: Cannot change callbacks after iteration has started."
1085  );
1086  }
1087  begin_callbacks_.clear();
1088  end_callbacks_.clear();
1089  return *this;
1090  }
1091 
1092  // -------------------------------------------------------------------------
1093  // Thread Settings
1094  // -------------------------------------------------------------------------
1095 
1099  std::shared_ptr<utils::ThreadPool> thread_pool() const
1100  {
1101  return thread_pool_;
1102  }
1103 
1109  self_type& thread_pool( std::shared_ptr<utils::ThreadPool> value )
1110  {
1111  if( has_started_ ) {
1112  throw std::runtime_error(
1113  "GenericInputStream: Cannot change thread pool after iteration has started."
1114  );
1115  }
1116  if( !value ) {
1117  throw std::runtime_error(
1118  "GenericInputStream: Cannot change thread pool to empty object."
1119  );
1120  }
1121  thread_pool_ = value;
1122  return *this;
1123  }
1124 
1128  size_t block_size() const
1129  {
1130  return block_size_;
1131  }
1132 
1142  self_type& block_size( size_t value )
1143  {
1144  if( has_started_ ) {
1145  throw std::runtime_error(
1146  "GenericInputStream: Cannot change thread pool block size after iteration has started."
1147  );
1148  }
1149  block_size_ = value;
1150  return *this;
1151  }
1152 
1153  // -------------------------------------------------------------------------
1154  // Data Members
1155  // -------------------------------------------------------------------------
1156 
1157 private:
1158 
1159  // We have two different types of functions that we accept to operate on the data:
1160  // transforms and filters, which are both executed when filling the buffers.
1161  // To keep it simple and a bit faster, we just store them as the same type here,
1162  // which can do both of their functionality at once.
1163  std::vector<std::function<bool(T&)>> transforms_and_filters_;
1164 
1165  // We also offer observers that are executed once the iteration
1166  // reaches or leaves the respective element.
1167  std::vector<std::function<void(T const&)>> on_enter_observers_;
1168  std::vector<std::function<void(T const&)>> on_leave_observers_;
1169 
1170  // We furthermore allow callbacks for the beginning and and of the iteration.
1171  std::vector<std::function<void(self_type const&)>> begin_callbacks_;
1172  std::vector<std::function<void(self_type const&)>> end_callbacks_;
1173 
1174  // Underlying iterator and associated data.
1175  std::function<bool( value_type& )> get_element_;
1176  Data data_;
1177 
1178  // Thread pool to run the buffering in the background.
1179  std::shared_ptr<utils::ThreadPool> thread_pool_;
1180 
1181  // Block buffering settings.
1182  size_t block_size_ = DEFAULT_BLOCK_SIZE;
1183 
1184  // Flag to avoid accidentally calling begin() twice, or calling threading settings
1185  // after begin() has already been called.
1186  bool has_started_ = false;
1187 
1188 };
1189 
1190 } // namespace utils
1191 } // namespace genesis
1192 
1193 #endif // include guard
genesis::utils::GenericInputStream::Iterator::operator++
self_type & operator++()
Definition: generic_input_stream.hpp:341
genesis::placement::swap
void swap(Sample &lhs, Sample &rhs)
Definition: sample.cpp:104
genesis::utils::GenericInputStream::add_filter
self_type & add_filter(std::function< bool(T const &)> const &filter)
Add a filter function that is applied to each element of the iteration.
Definition: generic_input_stream.hpp:920
genesis::utils::GenericInputStream::GenericInputStream
GenericInputStream(std::function< bool(value_type &)> get_element, std::shared_ptr< utils::ThreadPool > thread_pool=nullptr, size_t block_size=DEFAULT_BLOCK_SIZE)
Create an iterator over some underlying content.
Definition: generic_input_stream.hpp:790
genesis::utils::GenericInputStream::Iterator::Data
D Data
Definition: generic_input_stream.hpp:211
genesis::utils::GenericInputStream::Iterator::difference_type
std::ptrdiff_t difference_type
Definition: generic_input_stream.hpp:208
genesis::utils::GenericInputStream::data
Data const & data() const
Access the data stored in the iterator.
Definition: generic_input_stream.hpp:878
genesis::utils::GenericInputStream::block_size
self_type & block_size(size_t value)
Set the block size used for buffering the input data.
Definition: generic_input_stream.hpp:1142
genesis::utils::GenericInputStream::begin
Iterator begin()
Definition: generic_input_stream.hpp:852
genesis::utils::GenericInputStream::DEFAULT_BLOCK_SIZE
static const size_t DEFAULT_BLOCK_SIZE
Default size for block buffering.
Definition: generic_input_stream.hpp:187
genesis::utils::GenericInputStream::Iterator::operator=
Iterator & operator=(self_type const &)=default
genesis::utils::GenericInputStream::add_on_leave_observer
self_type & add_on_leave_observer(std::function< void(T const &)> const &observer)
Add a observer function that is executed when the iterator moves away from an element during the iter...
Definition: generic_input_stream.hpp:1005
genesis::utils::GenericInputStream::clear_observers
self_type & clear_observers()
Clear all functions that are executed on incrementing to the next element.
Definition: generic_input_stream.hpp:1021
genesis::utils::GenericInputStream::self_type
GenericInputStream self_type
Definition: generic_input_stream.hpp:171
genesis::utils::Options::global_thread_pool
std::shared_ptr< ThreadPool > global_thread_pool() const
Return a global thread pool to be used for parallel computations.
Definition: options.hpp:268
genesis::utils::GenericInputStream::GenericInputStream
GenericInputStream()=default
genesis::utils::GenericInputStream::Iterator::iterator_category
std::input_iterator_tag iterator_category
Definition: generic_input_stream.hpp:209
genesis::utils::GenericInputStream::add_end_callback
self_type & add_end_callback(std::function< void(GenericInputStream const &)> const &callback)
Add a callback function that is executed when the end of the iteration is reached.
Definition: generic_input_stream.hpp:1065
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::difference_type
std::ptrdiff_t difference_type
Definition: generic_input_stream.hpp:175
genesis::utils::GenericInputStream::~GenericInputStream
~GenericInputStream()=default
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::Data
VariantInputStreamData Data
Definition: generic_input_stream.hpp:178
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::pointer
value_type const * pointer
Definition: generic_input_stream.hpp:173
genesis::utils::GenericInputStream::end
Iterator end()
Definition: generic_input_stream.hpp:861
genesis::utils::GenericInputStream::Iterator::operator*
value_type & operator*()
Definition: generic_input_stream.hpp:298
genesis::utils::GenericInputStream::Iterator::operator*
const value_type & operator*() const
Definition: generic_input_stream.hpp:290
genesis::utils::GenericInputStream::add_on_enter_observer
self_type & add_on_enter_observer(std::function< void(T const &)> const &observer)
Add a observer function that is executed when the iterator moves to a new element during the iteratio...
Definition: generic_input_stream.hpp:981
genesis::utils::GenericInputStream::add_transform
self_type & add_transform(std::function< void(T &)> const &transform)
Add a transformation function that is applied to each element of the iteration.
Definition: generic_input_stream.hpp:903
genesis::utils::GenericInputStream::GenericInputStream
GenericInputStream(std::function< bool(value_type &)> get_element, Data &&data, std::shared_ptr< utils::ThreadPool > thread_pool=nullptr, size_t block_size=DEFAULT_BLOCK_SIZE)
Create an iterator over some underlying content.
Definition: generic_input_stream.hpp:827
genesis::utils::GenericInputStream::Iterator::pointer
value_type const * pointer
Definition: generic_input_stream.hpp:206
genesis::utils::GenericInputStream::GenericInputStream
GenericInputStream(std::function< bool(value_type &)> get_element, Data const &data, std::shared_ptr< utils::ThreadPool > thread_pool=nullptr, size_t block_size=DEFAULT_BLOCK_SIZE)
Create an iterator over some underlying content.
Definition: generic_input_stream.hpp:811
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::value_type
Variant value_type
Definition: generic_input_stream.hpp:172
genesis::utils::GenericInputStream::Iterator::operator==
bool operator==(self_type const &other) const
Compare two iterators for equality.
Definition: generic_input_stream.hpp:374
genesis::utils::GenericInputStream::operator=
self_type & operator=(self_type const &)=default
genesis::utils::GenericInputStream::Iterator::data
Data const & data() const
Access the data stored in the iterator.
Definition: generic_input_stream.hpp:309
genesis::utils::GenericInputStream::data
Data & data()
Access the data stored in the iterator.
Definition: generic_input_stream.hpp:886
genesis::utils::GenericInputStream::Iterator::reference
value_type const & reference
Definition: generic_input_stream.hpp:207
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::GenericInputStream::Iterator
Internal iterator over the data.
Definition: generic_input_stream.hpp:196
genesis::utils::GenericInputStream::add_transform_filter
self_type & add_transform_filter(std::function< bool(T &)> const &filter)
Add a transformation and filter function that is applied to each element of the iteration.
Definition: generic_input_stream.hpp:939
options.hpp
genesis::utils::GenericInputStream::Iterator::~Iterator
~Iterator()=default
genesis::utils::GenericInputStream::clear_filters_and_transformations
self_type & clear_filters_and_transformations()
Clear all filters and transformations.
Definition: generic_input_stream.hpp:954
genesis::utils::GenericInputStream::Iterator::Iterator
Iterator()=default
Default constructor for empty (past-the-end) data.
genesis::utils::ProactiveFuture
Wrapper around std::future that implements (pro-)active waiting, i.e., work stealing.
Definition: thread_pool.hpp:79
genesis::utils::GenericInputStream::Iterator::operator->
const value_type * operator->() const
Definition: generic_input_stream.hpp:274
genesis::utils::GenericInputStream::clear_callbacks
self_type & clear_callbacks()
Clear all functions that have been added via add_begin_callback() and add_end_callback().
Definition: generic_input_stream.hpp:1080
genesis::utils::GenericInputStream::Iterator::value_type
T value_type
Definition: generic_input_stream.hpp:205
genesis::utils::Options::get
static Options & get()
Returns a single instance of this class.
Definition: options.hpp:68
genesis::utils::GenericInputStream::thread_pool
self_type & thread_pool(std::shared_ptr< utils::ThreadPool > value)
Set the thread pool used for buffering of elements in this iterator.
Definition: generic_input_stream.hpp:1109
genesis::utils::GenericInputStream::add_begin_callback
self_type & add_begin_callback(std::function< void(GenericInputStream const &)> const &callback)
Add a callback function that is executed when beginning the iteration.
Definition: generic_input_stream.hpp:1047
thread_pool.hpp
genesis::utils::EmptyGenericInputStreamData
Empty helper data struct to serve as a dummy for GenericInputStream.
Definition: generic_input_stream.hpp:53
genesis::utils::GenericInputStream
Type erasure for iterators, using std::function to eliminate the underlying input type.
Definition: generic_input_stream.hpp:163
genesis::utils::GenericInputStream::thread_pool
std::shared_ptr< utils::ThreadPool > thread_pool() const
Get the thread pool used for buffering of elements in this iterator.
Definition: generic_input_stream.hpp:1099
genesis::utils::GenericInputStream::Iterator::GenericInputStream
friend GenericInputStream
Definition: generic_input_stream.hpp:268
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::iterator_category
std::input_iterator_tag iterator_category
Definition: generic_input_stream.hpp:176
genesis::utils::GenericInputStream::block_size
size_t block_size() const
Get the currenlty set block size used for buffering the input data.
Definition: generic_input_stream.hpp:1128
genesis::utils::GenericInputStream::Iterator
friend Iterator
Definition: generic_input_stream.hpp:846
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::reference
value_type const & reference
Definition: generic_input_stream.hpp:174
genesis::utils::GenericInputStream::Iterator::operator!=
bool operator!=(self_type const &other) const
Definition: generic_input_stream.hpp:390
genesis::utils::GenericInputStream::Iterator::operator->
value_type * operator->()
Definition: generic_input_stream.hpp:282