A library for working with phylogenetic and population genetic data.
v0.32.0
variant_parallel_input_stream.cpp
Go to the documentation of this file.
1 /*
2  Genesis - A toolkit for working with phylogenetic data.
3  Copyright (C) 2014-2024 Lucas Czech
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  Contact:
19  Lucas Czech <lczech@carnegiescience.edu>
20  Department of Plant Biology, Carnegie Institution For Science
21  260 Panama Street, Stanford, CA 94305, USA
22 */
23 
32 
37 
38 #include <algorithm>
39 #include <cassert>
40 #include <iterator>
41 #include <numeric>
42 #include <stdexcept>
43 
44 namespace genesis {
45 namespace population {
46 
47 // =================================================================================================
48 // Iterator Constructors and Rule of Five
49 // =================================================================================================
50 
52  VariantParallelInputStream* parent
53 )
54  : parent_(parent)
55 {
56  // We use the parent as a check if this Iterator is intended
57  // to be a begin() or end() iterator.
58  // We could also just use the default constructor to create end() iterators,
59  // this would have the same effect.
60  if( ! parent_ ) {
61  return;
62  }
63 
64  // If the parent is valid, initialize our input sources and start iterating them.
65  // Init the iterators and variant storage.
66  iterators_.reserve( parent_->inputs_.size() );
67  variant_sizes_.reserve( parent_->inputs_.size() );
68  for( size_t i = 0; i < parent_->inputs_.size(); ++i ) {
69  iterators_.emplace_back( parent_->inputs_[i].begin() );
70 
71  // We now have stored the iterator and called its begin() function,
72  // which in the GenericInputStream already obtains the first element.
73  // We use this to get the number of SampleCounts objects in the Variant.
74  // We will later need this to default-construct that many SampleCounts
75  // for positions where this iterator does not have data.
76  auto const sample_name_count = parent_->inputs_[i].data().sample_names.size();
77  if( iterators_[i] ) {
78  variant_sizes_.push_back( iterators_[i]->samples.size() );
79 
80  // We assume that the sample_names are of the correct size, if given.
81  if( sample_name_count > 0 && iterators_[i]->samples.size() != sample_name_count ) {
82  throw std::runtime_error(
83  "Input source for VariantParallelInputStream contains " +
84  std::to_string( iterators_[i]->samples.size() ) + " samples, but its sample " +
85  "name list contains " + std::to_string( sample_name_count ) + " names."
86  );
87  }
88 
89  // Let's make sure that the first position is a valid chromosome and
90  // position. Later, when we advance the iterator, we repeat the check
91  // for every locus we go to as well, just to be sure.
92  assert_correct_chr_and_pos_( iterators_[i] );
93  } else {
94  // If the iterator does not have any data at all, that is, it is already at its end,
95  // we use the length of its sample_names list to indicate how many samples it would
96  // have contained if it had data. This is useful for example in situations where the
97  // input is filtered, so that a file that in fact does have data appears to be without
98  // data here. Using the correct number of samples is then important for some downstream
99  // processors that expect the correct number of samples.
100  variant_sizes_.push_back( sample_name_count );
101  }
102  }
103 
104  // We use the sum of all to allocate memory for effciency. Let's compute that sum once.
105  variant_size_sum_ = std::accumulate(
106  variant_sizes_.begin(),
107  variant_sizes_.end(),
108  decltype( variant_sizes_ )::value_type( 0 )
109  );
110 
111  // Init with default constructed Variants.
112  variants_.resize( parent_->inputs_.size() );
113 
114  // Make sure all have the same size.
115  assert( iterators_.size() == parent_->inputs_.size() );
116  assert( iterators_.size() == variants_.size() );
117  assert( iterators_.size() == variant_sizes_.size() );
118 
119  // Lastly, start with the additional carrying loci. We have currently not implemented
120  // to use additional carrying loci in combination with sequence dict, as this would require
121  // to match the order of the additional loci... This seems to be a very specific use case,
122  // that we currently do not have, and so for now, we simply throw an exception in that case.
123  carrying_locus_it_ = parent_->carrying_loci_.cbegin();
124  if( parent_->sequence_dict_ && ! parent_->carrying_loci_.empty() ) {
125  assert( carrying_locus_it_ != parent_->carrying_loci_.cend() );
126  throw std::invalid_argument(
127  "VariantParallelInputStream was provided with a SequenceDict, and with additional "
128  "carrying loci to iterate over. This specific combination is currently not implemented "
129  "(as we did not have need for it so far). If you need this, please open an issue at "
130  "https://github.com/lczech/genesis/issues and we will see what we can do."
131  );
132  }
133 
134  // Now go to the first locus we want.
135  advance_();
136 }
137 
138 // =================================================================================================
139 // Iterator Accessors
140 // =================================================================================================
141 
143  JoinedVariantParams const& params
144 ) {
145  assert( iterators_.size() == variants_.size() );
146  assert( iterators_.size() == variant_sizes_.size() );
147  assert( iterators_.size() == parent_->inputs_.size() );
148 
149  // Prepare the result.
150  Variant res;
151  res.chromosome = current_locus_.chromosome;
152  res.position = current_locus_.position;
153  res.samples.reserve( variant_size_sum_ );
154 
155  // Special edge case: No inputs at all.
156  if( variants_.empty() ) {
157  return res;
158  }
159  assert( variants_.size() > 0 );
160  assert( variant_sizes_.size() > 0 );
161 
162  // Not all variants might have data; some might be the empty optional.
163  // We hence need to keep track of whether we already initialized our result or not.
164  // This only concernes the ref and alt base fields.
165  bool bases_init = false;
166 
167  // Go through all variants, and for those that have data, check the data correctness,
168  // and add them to the result.
169  size_t missing_cnt = 0;
170  for( size_t i = 0; i < variants_.size(); ++i ) {
171 
172  // If the variant has data, use it.
173  if(
174  variants_[i] &&
175  ( variants_[i]->status.passing() || ! params.treat_non_passing_variants_as_missing )
176  ) {
177 
178  // We already check all of the below when adding the data to variants_.
179  // Still, assert that this is all good.
180  assert( variants_[i]->chromosome == res.chromosome );
181  assert( variants_[i]->position == res.position );
182  assert( variants_[i]->samples.size() == variant_sizes_[i] );
183 
184  // Set and check the ref and alt bases.
185  // This is the first input that has data here. Use it to initialize the bases.
186  if( ! bases_init ) {
187  res.reference_base = utils::to_upper( variants_[i]->reference_base );
188  res.alternative_base = utils::to_upper( variants_[i]->alternative_base );
189  bases_init = true;
190  }
191 
192  // Now check that all inputs have the same bases. We however overwrite any input
193  // that has an N with an input that does not have N, to get the best data.
194  if( res.reference_base != utils::to_upper( variants_[i]->reference_base )) {
195  if( res.reference_base == 'N' ) {
196  res.reference_base = utils::to_upper( variants_[i]->reference_base );
197  } else if( params.allow_ref_base_mismatches ) {
198  res.reference_base = 'N';
199  } else {
200  throw std::runtime_error(
201  "Mismatching reference bases while iterating input sources in parallel at " +
202  to_string( current_locus_ ) + ". Some sources have base '" +
203  std::string( 1, res.reference_base ) + "' while others have '" +
204  std::string( 1, utils::to_upper( variants_[i]->reference_base )) + "'."
205  );
206  }
207  }
208  if( res.alternative_base != utils::to_upper( variants_[i]->alternative_base )) {
209  if( res.alternative_base == 'N' ) {
210  res.alternative_base = utils::to_upper( variants_[i]->alternative_base );
211  } else if( params.allow_alt_base_mismatches ) {
212  res.alternative_base = 'N';
213  } else {
214  throw std::runtime_error(
215  "Mismatching alternative bases while iterating input sources in parallel at " +
216  to_string( current_locus_ ) + ". Some sources have base '" +
217  std::string( 1, res.alternative_base ) + "' while others have '" +
218  std::string( 1, utils::to_upper( variants_[i]->alternative_base )) + "'."
219  );
220  }
221  }
222 
223  // Now move or copy the samples. The reserve calls in the following are not necessary,
224  // as we already allocate enough capacity above. We keep them here for future reference.
225  if( params.move_samples ) {
226  // res.samples.reserve( res.samples.size() + variants_[i]->samples.size() );
227  std::move(
228  std::begin( variants_[i]->samples ),
229  std::end( variants_[i]->samples ),
230  std::back_inserter( res.samples )
231  );
232  variants_[i]->samples.clear();
233  } else {
234  // res.samples.reserve( res.samples.size() + variants_[i]->samples.size() );
235  std::copy(
236  std::begin( variants_[i]->samples ),
237  std::end( variants_[i]->samples ),
238  std::back_inserter( res.samples )
239  );
240  }
241 
242  } else {
243 
244  // If the variant has no data, put as many dummy samples with empty SampleCounts
245  // into the result as the input source has samples in its data positions.
246  // res.samples.reserve( res.samples.size() + variant_sizes_[i].size() );
247  for( size_t k = 0; k < variant_sizes_[i]; ++k ) {
248  res.samples.emplace_back();
249  res.samples.back().status.set( SampleCountsFilterTag::kMissing );
250  }
251  ++missing_cnt;
252  }
253  }
254 
255  // If all are missing, we are at an additional locus, and set the variant itself to missing.
256  if( missing_cnt == variants_.size() ) {
257  assert( !bases_init );
258  assert(
259  carrying_locus_it_ != parent_->carrying_loci_.cend() &&
260  locus_equal( *carrying_locus_it_, current_locus_ )
261  );
263  }
264 
265  // If none of the input sources had data, that means that we are currently at an
266  // additional carrying locus. Check this. We do not need to do anything else,
267  // as the resulting Variant already contains all the information that we have at hand.
268  assert(
269  bases_init ||
270  (
271  carrying_locus_it_ != parent_->carrying_loci_.cend() &&
272  locus_equal( *carrying_locus_it_, current_locus_ )
273  )
274  );
275 
276  // Make sure that the number of samples is the same as the sum of all sample sizes
277  // in the variant_sizes_ vector combined.
278  assert( res.samples.size() == variant_size_sum_ );
279 
280  // Done. Return the merged result.
281  return res;
282 }
283 
284 // =================================================================================================
285 // Iterator Internal Members
286 // =================================================================================================
287 
288 // -------------------------------------------------------------------------
289 // advance_using_carrying_()
290 // -------------------------------------------------------------------------
291 
292 void VariantParallelInputStream::Iterator::advance_using_carrying_()
293 {
294  // Candidate locus. We look for the earliest position of the carrying iterators,
295  // as this is the next one we want to go to.
296  GenomeLocus cand_loc;
297 
298  // Go through all carrying iterators and find the earliest next position of any of them.
299  assert( iterators_.size() == parent_->selections_.size() );
300  for( size_t i = 0; i < iterators_.size(); ++i ) {
301  auto& iterator = iterators_[i];
302  if( ! iterator || parent_->selections_[i] != ContributionType::kCarrying ) {
303  continue;
304  }
305 
306  // In all iterators, we already have moved on to at least the current position.
307  // This assumes that all of the inputs are properly sorted. We check this in
308  // increment_iterator_()
309  // For the very first time this function is called (in the iterator constructor),
310  // current_locus_ is empty at this point, so we need to check this extra.
311  assert(
312  current_locus_.empty() ||
314  iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_
315  )
316  );
317 
318  // If this iterator is currently one of the ones that contain the current
319  // position, it's time to move on now. If not, we already asserted that it is
320  // greater, which means, it it still waiting for its turn, so nothing to do now.
321  if( locus_equal( iterator->chromosome, iterator->position, current_locus_ )) {
322  increment_iterator_( iterator );
323 
324  // We might now be done with this input source.
325  if( ! iterator ) {
326  continue;
327  }
328  }
329 
330  // Now comes the part where we find the earliest position that we want to stop at.
331  // Stop at the earliest postion of any iterator of type carrying (all of its positions
332  // need to be included), or if we are in the first iteration of the loop.
333  if(
334  cand_loc.empty() ||
335  locus_less(
336  iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
337  )
338  ) {
339  cand_loc = GenomeLocus{ iterator->chromosome, iterator->position };
340  }
341  }
342 
343  // If there are additional carrying loci, use them to find the candidate as well.
344  assert( parent_ );
345  if( carrying_locus_it_ != parent_->carrying_loci_.cend() ) {
346  // All the assertions from above apply here as well.
347  assert( ! carrying_locus_it_->empty() );
348  assert(
349  current_locus_.empty() ||
351  *carrying_locus_it_, current_locus_, parent_->sequence_dict_
352  )
353  );
354 
355  // If the carrying locus is at the current locus, we need to move it forward,
356  // same as above.
357  if( locus_equal( *carrying_locus_it_, current_locus_ ) ) {
358  ++carrying_locus_it_;
359  }
360 
361  // Now, if it still is not at its end, we can use it as a candidate as well,
362  // if it is earlier than the current input source based candidate (of it the candidate
363  // is empty, which for example happens if all input sources are following, or if all
364  // inputs have already reached their end).
365  if(
366  carrying_locus_it_ != parent_->carrying_loci_.cend() &&
367  (
368  cand_loc.empty() ||
369  locus_less( *carrying_locus_it_, cand_loc, parent_->sequence_dict_ )
370  )
371  ) {
372  cand_loc = *carrying_locus_it_;
373  }
374  }
375 
376  // If we have not set any candidate locus, that means that all carrying iterators
377  // are at their end. Time to wrap up then.
378  if( cand_loc.empty() ) {
379  assert( parent_ );
380  assert( parent_->has_carrying_input_ );
381 
382  // Assert that indeed all carrying iterators are at their end.
383  assert( [&](){
384  for( size_t i = 0; i < iterators_.size(); ++i ) {
385  if( parent_->selections_[i] == ContributionType::kCarrying && iterators_[i] ) {
386  return false;
387  }
388  }
389  return true;
390  }() );
391 
392  // Also, we must have reached the end of the additional carrying loci,
393  // otherwise we would have found a candidate from there.
394  assert( carrying_locus_it_ == parent_->carrying_loci_.cend() );
395 
396  // We are done here.
397  parent_ = nullptr;
398  return;
399  }
400 
401  // We have found a new locus. It needs to be further down from the current
402  // (again we need an extra check in the first call of this function, when current is empty).
403  assert( ! cand_loc.empty() );
404  assert(
405  current_locus_.empty() ||
406  locus_greater( cand_loc, current_locus_, parent_->sequence_dict_ )
407  );
408 
409  // Now that we found the next position to go to, move _all_ iterators to it
410  // (or the next one beyond, if it does not have that position).
411  for( size_t i = 0; i < iterators_.size(); ++i ) {
412  auto& iterator = iterators_[i];
413 
414  // Nothing to do if the iterator is already at its end.
415  if( ! iterator ) {
416  continue;
417  }
418 
419  // Same assertion as above, this time for all of them, just to be sure.
420  assert(
421  current_locus_.empty() ||
423  iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_
424  )
425  );
426 
427  // Now move the iterator until we reach the candidate, or one beyond.
428  // For carrying iterators, this loop can only get called once at max (or not at all,
429  // if this source is already at or beyond the candidate from the loop above),
430  // as we never want to skip anything in a carrying iterator. Assert this.
431  assert( ! cand_loc.empty() );
432  size_t cnt = 0;
433  while(
434  iterator &&
435  locus_less( iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_ )
436  ) {
437  increment_iterator_( iterator );
438  ++cnt;
439  }
440  (void) cnt;
441  assert( parent_->selections_[i] != ContributionType::kCarrying || cnt <= 1 );
442  }
443 
444  // Finally, update the current locus, and set the variants according to the iterators.
445  // The order of these is imporant, as the latter needs the former to be set.
446  current_locus_ = cand_loc;
447  update_variants_();
448 }
449 
450 // -------------------------------------------------------------------------
451 // advance_using_only_following_()
452 // -------------------------------------------------------------------------
453 
454 void VariantParallelInputStream::Iterator::advance_using_only_following_()
455 {
456  // If this function is called, we only have following iterators,
457  // so there are no addtional carrying loci given.
458  assert( carrying_locus_it_ == parent_->carrying_loci_.cend() );
459  assert( parent_->carrying_loci_.empty() );
460 
461  // Once one of the iterators reaches its end, we are done, as then there cannot
462  // be any more intersections.
463  bool at_least_one_input_is_at_end = false;
464 
465  // If this is not the first call of this function (the one that is done in the constructor
466  // of the iterator), move all iterators at least once, to get away from the current locus.
467  assert( iterators_.size() == parent_->selections_.size() );
468  if( ! current_locus_.empty() ) {
469  for( size_t i = 0; i < iterators_.size(); ++i ) {
470  auto& iterator = iterators_[i];
471 
472  // This function is only ever called if all inputs are of type following.
473  assert( parent_->selections_[i] == ContributionType::kFollowing );
474 
475  // As we are doing the intersection of all iterators here, none of them can be at the
476  // end right now. If one were, we would already have reached the end of our
477  // parallel iteration before, and never entered this function.
478  assert( iterator );
479 
480  // In all iterators, we must be at the current locus, as this is only intersections.
481  // So now, it's time to move on once. Then, go to the next iterator.
482  // The rest of this loop is not needed in this first round.
483  assert( locus_equal( iterator->chromosome, iterator->position, current_locus_ ));
484  increment_iterator_( iterator );
485 
486  // Check if we are done with this iterator. If so, we are completely done,
487  // no need to do anything else here.
488  if( ! iterator ) {
489  at_least_one_input_is_at_end = true;
490  break;
491  }
492  }
493  }
494 
495  // Candidate locus. We look for the earliest locus that all inputs share.
496  GenomeLocus cand_loc;
497 
498  // Loop until we have found a locus that all iterators share,
499  // or until one of them is at the end (in which case, there won't be any more intersections
500  // and we are done with the parallel iteration).
501  bool found_locus = false;
502  while( ! found_locus && ! at_least_one_input_is_at_end ) {
503 
504  // Assume that we are done. Below, we will reset these if we are not in fact done.
505  found_locus = true;
506 
507  // Try to find the candidate in all inputs.
508  for( size_t i = 0; i < iterators_.size(); ++i ) {
509  auto& iterator = iterators_[i];
510 
511  // This function is only ever called if all inputs are of type following.
512  assert( parent_->selections_[i] == ContributionType::kFollowing );
513 
514  // If the iterator is already at its end, we are done here.
515  // This case can here only occur if we have an empty input source,
516  // in which case the call to adanvance_() made from the constructor lead us here.
517  // We assert that this is indeed the first call of this function by using the
518  // current_element_ as a check.
519  if( ! iterator ) {
520  assert( current_locus_.empty() );
521  at_least_one_input_is_at_end = true;
522  found_locus = false;
523  break;
524  }
525 
526  // Init the candidate. This happens in the first iteration of the for loop.
527  if( cand_loc.empty() ) {
528  assert( i == 0 );
529  cand_loc = GenomeLocus{ iterator->chromosome, iterator->position };
530  }
531 
532  // If the iterator is behind the candidate, move it forward until it either catches
533  // up, or overshoots the locus, or reaches its end.
534  while(
535  iterator &&
536  locus_less(
537  iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
538  )
539  ) {
540  increment_iterator_( iterator );
541  }
542 
543  // If the iterator reached its end now, we are done here.
544  // No more intersections can occur, we can leave the whole thing.
545  if( ! iterator ) {
546  at_least_one_input_is_at_end = true;
547  found_locus = false;
548  break;
549  }
550 
551  // If we have an overshoot, the candidate is not good, as this means that not all
552  // inputs have that locus (as exemplified by that overshoot). In that case,
553  // we store the new candidate, and leave the loop. It does not really matter here
554  // if we go on to the next input, or just start the whole outer loop again.
555  // Let's keep the pace and continue with the next input.
556  assert( iterator );
557  assert( ! cand_loc.empty() );
558  if( locus_greater(
559  iterator->chromosome, iterator->position, cand_loc, parent_->sequence_dict_
560  )) {
561  cand_loc = GenomeLocus{ iterator->chromosome, iterator->position };
562  found_locus = false;
563  continue; // or break for the alternative loop order
564  }
565 
566  // If we are here, we have reached the candidate locus.
567  assert( iterator );
568  assert( locus_equal( iterator->chromosome, iterator->position, cand_loc ));
569  }
570  }
571 
572  // Only one of the exit conditions can be true (unless there is no input at all).
573  assert( iterators_.size() == 0 || ( found_locus ^ at_least_one_input_is_at_end ));
574 
575  // If we have not found any locus, that means that at least one of the iterators is at its end.
576  // No more intersections can occur. Time to wrap up then.
577  if( at_least_one_input_is_at_end ) {
578  assert( ! parent_->has_carrying_input_ );
579  parent_ = nullptr;
580 
581  // Assert that exactly one is at its end. Theoretically, in our search, other iterators
582  // might also have been about to reach their end, but as we immediately exit the above
583  // loop once we find an end, these are not incremented to their end yet.
584  // Nope, cannot assert that: We might have started with multiple empty sources...
585  // In that case, we are just done. All good.
586  // assert( [&](){
587  // size_t at_end_cnt = 0;
588  // for( size_t i = 0; i < iterators_.size(); ++i ) {
589  // if( ! iterators_[i] ) {
590  // ++at_end_cnt;
591  // }
592  // }
593  // return at_end_cnt == 1;
594  // }() );
595 
596  // We have indicated our end by nulling parent_. Nothing more to do.
597  return;
598  }
599 
600  // If we are here, we have found a good new locus. It needs to be further down from the current
601  // (again this needs a special check in the first call of this function, when current is empty).
602  assert(
603  iterators_.size() == 0 ||
604  current_locus_.empty() ||
605  locus_greater( cand_loc, current_locus_, parent_->sequence_dict_ )
606  );
607 
608  // Assert that all are at the given locus, and not at their end.
609  assert( iterators_.size() == 0 || found_locus );
610  assert( [&](){
611  for( size_t i = 0; i < iterators_.size(); ++i ) {
612  auto const& iterator = iterators_[i];
613  if( ! iterator || ! locus_equal( iterator->chromosome, iterator->position, cand_loc )) {
614  return false;
615  }
616  }
617  return true;
618  }() );
619 
620  // Finally, update the current locus, and set the variants according to the iterators.
621  // The order of these is imporant, as the latter needs the former to be set.
622  current_locus_ = cand_loc;
623  update_variants_();
624 }
625 
626 // -------------------------------------------------------------------------
627 // increment_iterator_()
628 // -------------------------------------------------------------------------
629 
630 void VariantParallelInputStream::Iterator::increment_iterator_(
632 ) {
633  // If we already reached the end, do nothing.
634  // if( ! iterator ) {
635  // return;
636  // }
637  //
638  // Nope, this function should never be called on a finished iterator.
639  assert( iterator );
640 
641  // We here check that the iterator is in chrom/pos order. This is potentially also already done
642  // in some of our file format iterators, and we here need an expensive string copy just for
643  // this one check, but it feels like this is necessary to be on the safe side. After all,
644  // this parallel iterator here is a bit tricky anyway, so we have to live with that cost.
645  auto const prev_loc = GenomeLocus{ iterator->chromosome, iterator->position };
646 
647  // Now do the increment and check whether we are done with this source.
648  ++iterator;
649  if( ! iterator ) {
650  return;
651  }
652 
653  // Check that it is has a valid chromosome and position, and
654  // make sure that the input is sorted.
655  assert_correct_chr_and_pos_( iterator );
657  iterator->chromosome, iterator->position, prev_loc, parent_->sequence_dict_
658  )) {
659  throw std::runtime_error(
660  "Cannot iterate multiple input sources in parallel, as (at least) "
661  "one of them is not in the correct sorting order. "
662  "By default, we expect lexicographical sorting of chromosomes, and then sorting by "
663  "position within chromosomes. "
664  "Alternatively, when a sequence dictionary is specified (such as from a .dict or .fai "
665  "file, or from a reference genome .fasta file), we expect the order of chromosomes "
666  "as specified there. "
667  "Offending input source: " + iterator.data().source_name + ", going from " +
668  to_string( prev_loc ) + " to " +
669  to_string( GenomeLocus{ iterator->chromosome, iterator->position } )
670  );
671  }
672 }
673 
674 // -------------------------------------------------------------------------
675 // assert_correct_chr_and_pos_()
676 // -------------------------------------------------------------------------
677 
678 void VariantParallelInputStream::Iterator::assert_correct_chr_and_pos_(
679  VariantInputStream::Iterator const& iterator
680 ) {
681  assert( iterator );
682 
683  // This is checked already in our file format iterators, but we heavily depend
684  // on this here, so let's check it.
685  if( iterator->chromosome.empty() || iterator->position == 0 ) {
686  throw std::runtime_error(
687  "Cannot iterate multiple input sources in parallel, as (at least) "
688  "one of them has an invalid chromosome (empty name) or position (0). "
689  "Offending input source: " + iterator.data().source_name + " at " +
690  iterator->chromosome + ":" + std::to_string( iterator->position )
691  );
692  }
693 }
694 
695 // -------------------------------------------------------------------------
696 // update_variants_()
697 // -------------------------------------------------------------------------
698 
699 void VariantParallelInputStream::Iterator::update_variants_()
700 {
701  assert( iterators_.size() == variants_.size() );
702  assert( ! current_locus_.empty() );
703  for( size_t i = 0; i < iterators_.size(); ++i ) {
704  auto& iterator = iterators_[i];
705 
706  // If the iterator is already finished, we store an empty optional variant.
707  if( ! iterator ) {
708  variants_[i] = utils::nullopt;
709  continue;
710  }
711 
712  // If the iterator is at the current locus, we store its data here,
713  // so that users can access it. If not, we store an empty optional variant.
714  if( locus_equal( iterator->chromosome, iterator->position, current_locus_ )) {
715 
716  // We ideally want to move all data here, for efficiency.
717  // The user does not have access to the iterators, so this is okay.
718  // We however cannot move all the data, as we will later need access to the
719  // chromosome and position of the iterators; so instead, we only move the expensive
720  // SampleCounts samples. In order to avoid that when we add more elements to Variant later
721  // and then accidentally forget to also set them here, we do a three-step process where
722  // we move the SampleCounts over to a temp location first, and then copy the rest.
723  // This ensures that whatever other fields Variant gets in the future, we always
724  // copy them as well. So future proof!
725 
726  // Move the samples and leave them in a well-defined empty state in the iterator.
727  auto tmp_samples = std::move( iterator->samples );
728  iterator->samples.clear();
729 
730  // Now we can copy the rest, which will not copy any samples (as we just cleared them),
731  // and then finally move over the samples again.
732  variants_[i] = *iterator;
733  variants_[i]->samples = std::move( tmp_samples );
734 
735  // Check for consistency. This is also already checked in all our input
736  // file sources that we have implemented so far, but better safe than sorry.
737  // Also, we need to do this in case someone uses a different source that does not check.
738  if( variants_[i]->samples.size() != variant_sizes_[i] ) {
739  throw std::runtime_error(
740  "Cannot iterate multiple input sources in parallel, as (at least) "
741  "one of them has an inconsistent number of samples. "
742  "Offending input source: " + iterator.data().source_name + " at " +
743  iterator->chromosome + ":" + std::to_string( iterator->position ) + ". " +
744  "Expecting " + std::to_string( variant_sizes_[i] ) +
745  " samples (based on the first used line of input of that source), " +
746  "but found " + std::to_string( variants_[i]->samples.size() ) +
747  " at the indicated locus."
748  );
749  }
750 
751  // Naive version that just makes a full copy.
752  // variants_[i] = *iterator;
753  } else {
754  // The iterator is not at our current locus. We already checked that it is
755  // however still valid, so it must be beyond the current one. Assert this.
756  assert( locus_greater(
757  iterator->chromosome, iterator->position, current_locus_, parent_->sequence_dict_
758  ));
759 
760  variants_[i] = utils::nullopt;
761  }
762  }
763 }
764 
765 } // namespace population
766 } // namespace genesis
genesis::population::locus_equal
bool locus_equal(std::string const &l_chromosome, size_t l_position, std::string const &r_chromosome, size_t r_position)
Equality comparison (==) for two loci in a genome.
Definition: function/genome_locus.hpp:300
genesis::population::GenomeLocus::empty
bool empty() const
Definition: genome_locus.hpp:84
genesis::population::VariantParallelInputStream::JoinedVariantParams::allow_ref_base_mismatches
bool allow_ref_base_mismatches
Definition: variant_parallel_input_stream.hpp:238
genesis::population::VariantParallelInputStream::Iterator::Iterator
Iterator()=default
genesis::population::VariantParallelInputStream::JoinedVariantParams::move_samples
bool move_samples
Definition: variant_parallel_input_stream.hpp:241
genesis::population::Variant::position
size_t position
Definition: variant.hpp:74
genesis::population::GenomeLocus::position
size_t position
Definition: genome_locus.hpp:67
genesis::population::locus_greater
bool locus_greater(std::string const &l_chromosome, size_t l_position, std::string const &r_chromosome, size_t r_position)
Greater than comparison (>) for two loci in a genome.
Definition: function/genome_locus.hpp:465
genesis::utils::nullopt
const nullopt_t nullopt((nullopt_t::init()))
Optional to indicate an empty value.
genesis::population::VariantParallelInputStream::JoinedVariantParams
Parameters for VariantParallelInputStream::Iterator::joined_variant()
Definition: variant_parallel_input_stream.hpp:231
genesis::population::GenomeLocus
A single locus, that is, a position (or coordinate) on a chromosome.
Definition: genome_locus.hpp:64
genesis::population::Variant::reference_base
char reference_base
Definition: variant.hpp:79
genesis::population::locus_greater_or_equal
bool locus_greater_or_equal(std::string const &l_chromosome, size_t l_position, std::string const &r_chromosome, size_t r_position)
Greater than or equal comparison (>=) for two loci in a genome.
Definition: function/genome_locus.hpp:552
genesis::population::GenomeLocus::chromosome
std::string chromosome
Definition: genome_locus.hpp:66
genesis::population::FilterStatus::set
void set(IntType value)
Definition: filter_status.hpp:100
genesis::population::to_string
std::string to_string(GenomeLocus const &locus)
Definition: function/genome_locus.hpp:52
sample_counts_filter.hpp
genesis::utils::to_upper
constexpr char to_upper(char c) noexcept
Return the upper case version of a letter, ASCII-only.
Definition: char.hpp:230
logging.hpp
Provides easy and fast logging functionality.
genesis::population::Variant
A single variant at a position in a chromosome, along with SampleCounts for a set of samples.
Definition: variant.hpp:65
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::population::VariantParallelInputStream::Iterator::joined_variant
Variant joined_variant(JoinedVariantParams const &params=JoinedVariantParams{})
Create a single Variant instance that combines all Variants from the input sources at the current loc...
Definition: variant_parallel_input_stream.cpp:142
variant_parallel_input_stream.hpp
variant_filter.hpp
char.hpp
genesis::population::VariantFilterTag::kMissing
@ kMissing
Position is missing in the input data.
genesis::population::Variant::samples
std::vector< SampleCounts > samples
Definition: variant.hpp:82
genesis::population::Variant::alternative_base
char alternative_base
Definition: variant.hpp:80
genesis::population::VariantParallelInputStream::JoinedVariantParams::treat_non_passing_variants_as_missing
bool treat_non_passing_variants_as_missing
Definition: variant_parallel_input_stream.hpp:240
genesis::population::locus_less
bool locus_less(std::string const &l_chromosome, size_t l_position, std::string const &r_chromosome, size_t r_position)
Less than comparison (<) for two loci in a genome.
Definition: function/genome_locus.hpp:414
genesis::population::VariantParallelInputStream::JoinedVariantParams::allow_alt_base_mismatches
bool allow_alt_base_mismatches
Definition: variant_parallel_input_stream.hpp:239
genesis::population::VariantParallelInputStream::ContributionType::kFollowing
@ kFollowing
For a given input, only stop at positions where other inputs also want to stop.
genesis::population::SampleCountsFilterTag::kMissing
@ kMissing
Position is missing in the input data.
genesis::population::locus_less_or_equal
bool locus_less_or_equal(std::string const &l_chromosome, size_t l_position, std::string const &r_chromosome, size_t r_position)
Less than or equal comparison (<=) for two loci in a genome.
Definition: function/genome_locus.hpp:505
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::Iterator
friend Iterator
Definition: generic_input_stream.hpp:846
genesis::population::Variant::status
FilterStatus status
Definition: variant.hpp:76
genesis::population::Variant::chromosome
std::string chromosome
Definition: variant.hpp:73
genesis::population::VariantParallelInputStream::ContributionType::kCarrying
@ kCarrying
For a given input, stop at all its positions.