45 namespace population {
52 VariantParallelInputStream* parent
66 iterators_.reserve( parent_->inputs_.size() );
67 variant_sizes_.reserve( parent_->inputs_.size() );
68 for(
size_t i = 0; i < parent_->inputs_.size(); ++i ) {
69 iterators_.emplace_back( parent_->inputs_[i].begin() );
76 auto const sample_name_count = parent_->inputs_[i].data().sample_names.size();
78 variant_sizes_.push_back( iterators_[i]->samples.size() );
81 if( sample_name_count > 0 && iterators_[i]->samples.size() != sample_name_count ) {
82 throw std::runtime_error(
83 "Input source for VariantParallelInputStream contains " +
84 std::to_string( iterators_[i]->samples.size() ) +
" samples, but its sample " +
85 "name list contains " +
std::to_string( sample_name_count ) +
" names."
92 assert_correct_chr_and_pos_( iterators_[i] );
100 variant_sizes_.push_back( sample_name_count );
105 variant_size_sum_ = std::accumulate(
106 variant_sizes_.begin(),
107 variant_sizes_.end(),
108 decltype( variant_sizes_ )::value_type( 0 )
112 variants_.resize( parent_->inputs_.size() );
115 assert( iterators_.size() == parent_->inputs_.size() );
116 assert( iterators_.size() == variants_.size() );
117 assert( iterators_.size() == variant_sizes_.size() );
123 carrying_locus_it_ = parent_->carrying_loci_.cbegin();
124 if( parent_->sequence_dict_ && ! parent_->carrying_loci_.empty() ) {
125 assert( carrying_locus_it_ != parent_->carrying_loci_.cend() );
126 throw std::invalid_argument(
127 "VariantParallelInputStream was provided with a SequenceDict, and with additional "
128 "carrying loci to iterate over. This specific combination is currently not implemented "
129 "(as we did not have need for it so far). If you need this, please open an issue at "
130 "https://github.com/lczech/genesis/issues and we will see what we can do."
145 assert( iterators_.size() == variants_.size() );
146 assert( iterators_.size() == variant_sizes_.size() );
147 assert( iterators_.size() == parent_->inputs_.size() );
153 res.
samples.reserve( variant_size_sum_ );
156 if( variants_.empty() ) {
159 assert( variants_.size() > 0 );
160 assert( variant_sizes_.size() > 0 );
165 bool bases_init =
false;
169 size_t missing_cnt = 0;
170 for(
size_t i = 0; i < variants_.size(); ++i ) {
180 assert( variants_[i]->chromosome == res.
chromosome );
181 assert( variants_[i]->position == res.
position );
182 assert( variants_[i]->samples.size() == variant_sizes_[i] );
200 throw std::runtime_error(
201 "Mismatching reference bases while iterating input sources in parallel at " +
202 to_string( current_locus_ ) +
". Some sources have base '" +
204 std::string( 1,
utils::to_upper( variants_[i]->reference_base )) +
"'."
214 throw std::runtime_error(
215 "Mismatching alternative bases while iterating input sources in parallel at " +
216 to_string( current_locus_ ) +
". Some sources have base '" +
218 std::string( 1,
utils::to_upper( variants_[i]->alternative_base )) +
"'."
228 std::begin( variants_[i]->samples ),
229 std::end( variants_[i]->samples ),
230 std::back_inserter( res.
samples )
232 variants_[i]->samples.clear();
236 std::begin( variants_[i]->samples ),
237 std::end( variants_[i]->samples ),
238 std::back_inserter( res.
samples )
247 for(
size_t k = 0; k < variant_sizes_[i]; ++k ) {
256 if( missing_cnt == variants_.size() ) {
257 assert( !bases_init );
259 carrying_locus_it_ != parent_->carrying_loci_.cend() &&
271 carrying_locus_it_ != parent_->carrying_loci_.cend() &&
278 assert( res.
samples.size() == variant_size_sum_ );
292 void VariantParallelInputStream::Iterator::advance_using_carrying_()
299 assert( iterators_.size() == parent_->selections_.size() );
300 for(
size_t i = 0; i < iterators_.size(); ++i ) {
301 auto& iterator = iterators_[i];
312 current_locus_.empty() ||
314 iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_
321 if(
locus_equal( iterator->chromosome, iterator->position, current_locus_ )) {
322 increment_iterator_( iterator );
336 iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
339 cand_loc = GenomeLocus{ iterator->
chromosome, iterator->position };
345 if( carrying_locus_it_ != parent_->carrying_loci_.cend() ) {
347 assert( ! carrying_locus_it_->empty() );
349 current_locus_.empty() ||
351 *carrying_locus_it_, current_locus_, parent_->sequence_dict_
357 if(
locus_equal( *carrying_locus_it_, current_locus_ ) ) {
358 ++carrying_locus_it_;
366 carrying_locus_it_ != parent_->carrying_loci_.cend() &&
369 locus_less( *carrying_locus_it_, cand_loc, parent_->sequence_dict_ )
372 cand_loc = *carrying_locus_it_;
378 if( cand_loc.
empty() ) {
380 assert( parent_->has_carrying_input_ );
384 for(
size_t i = 0; i < iterators_.size(); ++i ) {
394 assert( carrying_locus_it_ == parent_->carrying_loci_.cend() );
403 assert( ! cand_loc.
empty() );
405 current_locus_.empty() ||
406 locus_greater( cand_loc, current_locus_, parent_->sequence_dict_ )
411 for(
size_t i = 0; i < iterators_.size(); ++i ) {
412 auto& iterator = iterators_[i];
421 current_locus_.empty() ||
423 iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_
431 assert( ! cand_loc.
empty() );
435 locus_less( iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_ )
437 increment_iterator_( iterator );
446 current_locus_ = cand_loc;
454 void VariantParallelInputStream::Iterator::advance_using_only_following_()
458 assert( carrying_locus_it_ == parent_->carrying_loci_.cend() );
459 assert( parent_->carrying_loci_.empty() );
463 bool at_least_one_input_is_at_end =
false;
467 assert( iterators_.size() == parent_->selections_.size() );
468 if( ! current_locus_.empty() ) {
469 for(
size_t i = 0; i < iterators_.size(); ++i ) {
470 auto& iterator = iterators_[i];
483 assert(
locus_equal( iterator->chromosome, iterator->position, current_locus_ ));
484 increment_iterator_( iterator );
489 at_least_one_input_is_at_end =
true;
496 GenomeLocus cand_loc;
501 bool found_locus =
false;
502 while( ! found_locus && ! at_least_one_input_is_at_end ) {
508 for(
size_t i = 0; i < iterators_.size(); ++i ) {
509 auto& iterator = iterators_[i];
520 assert( current_locus_.empty() );
521 at_least_one_input_is_at_end =
true;
527 if( cand_loc.empty() ) {
529 cand_loc = GenomeLocus{ iterator->chromosome, iterator->position };
537 iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
540 increment_iterator_( iterator );
546 at_least_one_input_is_at_end =
true;
557 assert( ! cand_loc.empty() );
559 iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
561 cand_loc = GenomeLocus{ iterator->chromosome, iterator->position };
568 assert(
locus_equal( iterator->chromosome, iterator->position, cand_loc ));
573 assert( iterators_.size() == 0 || ( found_locus ^ at_least_one_input_is_at_end ));
577 if( at_least_one_input_is_at_end ) {
578 assert( ! parent_->has_carrying_input_ );
603 iterators_.size() == 0 ||
604 current_locus_.empty() ||
605 locus_greater( cand_loc, current_locus_, parent_->sequence_dict_ )
609 assert( iterators_.size() == 0 || found_locus );
611 for(
size_t i = 0; i < iterators_.size(); ++i ) {
612 auto const& iterator = iterators_[i];
613 if( ! iterator || !
locus_equal( iterator->chromosome, iterator->position, cand_loc )) {
622 current_locus_ = cand_loc;
630 void VariantParallelInputStream::Iterator::increment_iterator_(
645 auto const prev_loc = GenomeLocus{ iterator->chromosome, iterator->position };
655 assert_correct_chr_and_pos_( iterator );
657 iterator->chromosome, iterator->position, prev_loc, parent_->sequence_dict_
659 throw std::runtime_error(
660 "Cannot iterate multiple input sources in parallel, as (at least) "
661 "one of them is not in the correct sorting order. "
662 "By default, we expect lexicographical sorting of chromosomes, and then sorting by "
663 "position within chromosomes. "
664 "Alternatively, when a sequence dictionary is specified (such as from a .dict or .fai "
665 "file, or from a reference genome .fasta file), we expect the order of chromosomes "
666 "as specified there. "
667 "Offending input source: " + iterator.data().source_name +
", going from " +
669 to_string( GenomeLocus{ iterator->chromosome, iterator->position } )
678 void VariantParallelInputStream::Iterator::assert_correct_chr_and_pos_(
685 if( iterator->chromosome.empty() || iterator->position == 0 ) {
686 throw std::runtime_error(
687 "Cannot iterate multiple input sources in parallel, as (at least) "
688 "one of them has an invalid chromosome (empty name) or position (0). "
689 "Offending input source: " + iterator.data().source_name +
" at " +
699 void VariantParallelInputStream::Iterator::update_variants_()
701 assert( iterators_.size() == variants_.size() );
702 assert( ! current_locus_.empty() );
703 for(
size_t i = 0; i < iterators_.size(); ++i ) {
704 auto& iterator = iterators_[i];
714 if(
locus_equal( iterator->chromosome, iterator->position, current_locus_ )) {
727 auto tmp_samples = std::move( iterator->samples );
728 iterator->samples.clear();
732 variants_[i] = *iterator;
733 variants_[i]->samples = std::move( tmp_samples );
738 if( variants_[i]->samples.size() != variant_sizes_[i] ) {
739 throw std::runtime_error(
740 "Cannot iterate multiple input sources in parallel, as (at least) "
741 "one of them has an inconsistent number of samples. "
742 "Offending input source: " + iterator.data().source_name +
" at " +
743 iterator->chromosome +
":" +
std::to_string( iterator->position ) +
". " +
745 " samples (based on the first used line of input of that source), " +
747 " at the indicated locus."
757 iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_