1 #ifndef GENESIS_UTILS_CONTAINERS_GENERIC_INPUT_STREAM_H_
2 #define GENESIS_UTILS_CONTAINERS_GENERIC_INPUT_STREAM_H_
162 template<
class T,
class D = EmptyGenericInputStreamData>
233 : generator_( generator )
234 , current_block_( std::make_shared<std::vector<T>>() )
235 , buffer_block_( std::make_shared<std::vector<T>>() )
246 if( ! generator_->get_element_ ) {
247 throw std::invalid_argument(
248 "Cannot use GenericInputStream without a function to get elements."
276 assert( current_block_ );
277 assert( current_pos_ < end_pos_ );
278 assert( current_pos_ < current_block_->size() );
279 return &((*current_block_)[current_pos_]);
284 assert( current_block_ );
285 assert( current_pos_ < end_pos_ );
286 assert( current_pos_ < current_block_->size() );
287 return &((*current_block_)[current_pos_]);
292 assert( current_block_ );
293 assert( current_pos_ < end_pos_ );
294 assert( current_pos_ < current_block_->size() );
295 return (*current_block_)[current_pos_];
300 assert( current_block_ );
301 assert( current_pos_ < end_pos_ );
302 assert( current_pos_ < current_block_->size() );
303 return (*current_block_)[current_pos_];
312 throw std::runtime_error(
313 "Cannot access default constructed or past-the-end GenericInputStream content."
316 assert( generator_ );
317 return generator_->data_;
358 operator bool()
const
362 return generator_ !=
nullptr;
381 if( ! generator_ || ! other.generator_ ) {
384 return generator_ == other.generator_;
386 assert( generator_ && other.generator_ );
387 return current_block_ == other.current_block_ && current_pos_ == other.current_pos_;
392 return !(*
this == other);
401 void begin_iteration_()
404 assert( generator_ );
405 assert( current_block_ );
406 assert( buffer_block_ );
409 execute_begin_callbacks_();
412 if( generator_->block_size_ == 0 ) {
413 begin_iteration_without_buffer_();
415 begin_iteration_with_buffer_();
419 void begin_iteration_without_buffer_()
424 assert( generator_ );
425 assert( generator_->block_size_ == 0 );
426 assert( current_pos_ == 0 );
427 current_block_->resize( 1 );
429 increment_without_buffer_();
432 void begin_iteration_with_buffer_()
435 assert( generator_->thread_pool_ );
439 current_block_->resize( generator_->block_size_ );
440 buffer_block_->resize( generator_->block_size_ );
441 assert( current_block_->size() == generator_->block_size_ );
442 assert( buffer_block_->size() == generator_->block_size_ );
446 end_pos_ = read_block_( generator_, current_block_, generator_->block_size_ );
447 assert( current_pos_ == 0 );
453 if( end_pos_ == generator_->block_size_ ) {
454 enqueue_filling_of_buffer_block_();
455 }
else if( end_pos_ == 0 ) {
460 assert( end_pos_ > 0 );
461 assert( end_pos_ <= generator_->block_size_ );
465 assert( current_pos_ == 0 );
466 execute_on_enter_observers_( (*current_block_)[current_pos_] );
472 assert( generator_ );
473 assert( current_block_ && current_block_->size() > 0 );
477 current_block_->size() == generator_->block_size_ ||
479 generator_->block_size_ == 0 &&
480 current_block_->size() == 1
483 assert( buffer_block_->size() == generator_->block_size_ );
490 assert( current_pos_ < current_block_->size() );
491 execute_on_leave_observers_( (*current_block_)[current_pos_] );
494 if( generator_->block_size_ == 0 ) {
495 increment_without_buffer_();
497 increment_with_buffer_();
501 void increment_without_buffer_()
505 assert( current_pos_ == 0 );
506 assert( current_block_ );
507 assert( current_block_->size() == 1 );
508 if( get_next_element_( generator_, (*current_block_)[0] )) {
509 execute_on_enter_observers_( (*current_block_)[0] );
515 void increment_with_buffer_()
523 assert( current_pos_ <= end_pos_ );
527 if( current_pos_ == end_pos_ ) {
531 if( end_pos_ < generator_->block_size_ ) {
539 assert( end_pos_ == generator_->block_size_ );
541 assert( future_->valid() );
548 end_pos_ = future_->get();
549 if( end_pos_ == 0 ) {
565 assert( current_block_ );
566 assert( buffer_block_ );
567 assert( end_pos_ > 0 && end_pos_ <= generator_->block_size_ );
568 std::swap( buffer_block_, current_block_ );
569 if( end_pos_ == generator_->block_size_ ) {
570 assert( !future_->valid() );
571 enqueue_filling_of_buffer_block_();
572 assert( future_->valid() );
579 assert( current_pos_ < end_pos_ );
580 execute_on_enter_observers_( (*current_block_)[current_pos_] );
583 void enqueue_filling_of_buffer_block_()
586 assert( generator_ );
587 assert( generator_->thread_pool_ );
593 assert( ! future_->valid() );
597 current_block_->size() == generator_->block_size_ ||
599 generator_->block_size_ == 0 &&
600 current_block_->size() == 1
603 assert( buffer_block_ );
604 assert( buffer_block_->size() == generator_->block_size_ );
609 auto generator = generator_;
610 auto buffer_block = buffer_block_;
619 *future_ = generator_->thread_pool_->enqueue_and_retrieve(
621 return read_block_( generator, buffer_block,
block_size );
629 static size_t read_block_(
631 std::shared_ptr<std::vector<T>> buffer_block,
645 assert( buffer_block );
651 if( ! get_next_element_( generator, (*buffer_block)[i] )) {
665 static bool get_next_element_(
670 bool usable_element =
false;
673 bool const got_element = generator->get_element_( target );
680 usable_element =
true;
681 for(
auto const& tra_fil : generator->transforms_and_filters_ ) {
682 usable_element = tra_fil( target );
683 if( ! usable_element ) {
692 if( usable_element ) {
695 assert( got_element );
703 assert( ! got_element );
704 usable_element =
false;
708 return usable_element;
711 void end_iteration_()
718 assert( generator_ );
719 assert( ! future_->valid() );
720 execute_end_callbacks_();
721 generator_ =
nullptr;
724 void execute_on_enter_observers_( T
const& element )
const
726 assert( generator_ );
727 for(
auto const& observer : generator_->on_enter_observers_ ) {
732 void execute_on_leave_observers_( T
const& element )
const
734 assert( generator_ );
735 for(
auto const& observer : generator_->on_leave_observers_ ) {
740 void execute_begin_callbacks_()
const
742 assert( generator_ );
743 for(
auto const& cb : generator_->begin_callbacks_ ) {
748 void execute_end_callbacks_()
const
750 assert( generator_ );
751 for(
auto const& cb : generator_->end_callbacks_ ) {
762 std::shared_ptr<std::vector<T>> current_block_;
763 std::shared_ptr<std::vector<T>> buffer_block_;
764 size_t current_pos_ = 0;
769 std::shared_ptr<ProactiveFuture<size_t>> future_;
792 std::shared_ptr<utils::ThreadPool>
thread_pool =
nullptr,
795 : get_element_(get_element)
814 std::shared_ptr<utils::ThreadPool>
thread_pool =
nullptr,
830 std::shared_ptr<utils::ThreadPool>
thread_pool =
nullptr,
835 data_ = std::move(
data );
855 throw std::runtime_error(
"GenericInputStream: Cannot call begin() multiple times." );
870 operator bool()
const
872 return static_cast<bool>( get_element_ );
906 [transform]( T& element ){
907 transform( element );
923 [filter]( T& element ){
924 return filter( element );
943 throw std::runtime_error(
944 "GenericInputStream: Cannot change filters/transformations after iteration has started."
947 transforms_and_filters_.push_back( filter );
957 throw std::runtime_error(
958 "GenericInputStream: Cannot change filters/transformations after iteration has started."
961 transforms_and_filters_.clear();
984 throw std::runtime_error(
985 "GenericInputStream: Cannot change observers after iteration has started."
988 on_enter_observers_.push_back( observer );
1007 if( has_started_ ) {
1008 throw std::runtime_error(
1009 "GenericInputStream: Cannot change observers after iteration has started."
1012 on_leave_observers_.push_back( observer );
1023 if( has_started_ ) {
1024 throw std::runtime_error(
1025 "GenericInputStream: Cannot change observers after iteration has started."
1028 on_enter_observers_.clear();
1029 on_leave_observers_.clear();
1049 if( has_started_ ) {
1050 throw std::runtime_error(
1051 "GenericInputStream: Cannot change callbacks after iteration has started."
1054 begin_callbacks_.push_back( callback );
1067 if( has_started_ ) {
1068 throw std::runtime_error(
1069 "GenericInputStream: Cannot change callbacks after iteration has started."
1072 end_callbacks_.push_back( callback );
1082 if( has_started_ ) {
1083 throw std::runtime_error(
1084 "GenericInputStream: Cannot change callbacks after iteration has started."
1087 begin_callbacks_.clear();
1088 end_callbacks_.clear();
1101 return thread_pool_;
1111 if( has_started_ ) {
1112 throw std::runtime_error(
1113 "GenericInputStream: Cannot change thread pool after iteration has started."
1117 throw std::runtime_error(
1118 "GenericInputStream: Cannot change thread pool to empty object."
1121 thread_pool_ = value;
1144 if( has_started_ ) {
1145 throw std::runtime_error(
1146 "GenericInputStream: Cannot change thread pool block size after iteration has started."
1149 block_size_ = value;
1163 std::vector<std::function<bool(T&)>> transforms_and_filters_;
1167 std::vector<std::function<void(T
const&)>> on_enter_observers_;
1168 std::vector<std::function<void(T
const&)>> on_leave_observers_;
1171 std::vector<std::function<void(
self_type const&)>> begin_callbacks_;
1172 std::vector<std::function<void(
self_type const&)>> end_callbacks_;
1175 std::function<bool(
value_type& )> get_element_;
1179 std::shared_ptr<utils::ThreadPool> thread_pool_;
1186 bool has_started_ =
false;
1193 #endif // include guard