A library for working with phylogenetic and population genetic data.
v0.32.0
variant_parallel_input_stream.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_POPULATION_STREAM_VARIANT_PARALLEL_INPUT_STREAM_H_
2 #define GENESIS_POPULATION_STREAM_VARIANT_PARALLEL_INPUT_STREAM_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
41 
42 #include <cassert>
43 #include <functional>
44 #include <memory>
45 #include <set>
46 #include <stdexcept>
47 #include <string>
48 #include <utility>
49 #include <vector>
50 
51 namespace genesis {
52 namespace population {
53 
54 // =================================================================================================
55 // Variant Parallel Input Stream
56 // =================================================================================================
57 
146 {
147 public:
148 
149  // -------------------------------------------------------------------------
150  // Typedefs and Enums
151  // -------------------------------------------------------------------------
152 
183  enum class ContributionType
184  {
191  kCarrying,
192 
201  kFollowing
202  };
203 
232  {
233  // Weird clang error here if the following is not defined the way it is now.
234  // See https://stackoverflow.com/a/44693603/4184258
236  {}
237 
241  bool move_samples = false;
242  };
243 
246 
247  // ======================================================================================
248  // Internal Iterator
249  // ======================================================================================
250 
259  class Iterator
260  {
261  public:
262 
263  // -------------------------------------------------------------------------
264  // Constructors and Rule of Five
265  // -------------------------------------------------------------------------
266 
268  using value_type = std::vector<utils::Optional<Variant>>;
269  using pointer = value_type const*;
270  using reference = value_type const&;
271  using iterator_category = std::input_iterator_tag;
272 
273  public:
274 
275  Iterator() = default;
276 
277  private:
278 
280 
281  public:
282 
283  ~Iterator() = default;
284 
285  Iterator( self_type const& ) = default;
286  Iterator( self_type&& ) = default;
287 
288  Iterator& operator= ( self_type const& ) = default;
289  Iterator& operator= ( self_type&& ) = default;
290 
292 
293  // -------------------------------------------------------------------------
294  // Accessors
295  // -------------------------------------------------------------------------
296 
297  std::vector<utils::Optional<Variant>> const * operator->() const
298  {
299  return &variants_;
300  }
301 
302  std::vector<utils::Optional<Variant>> * operator->()
303  {
304  return &variants_;
305  }
306 
307  std::vector<utils::Optional<Variant>> const & operator*() const
308  {
309  return variants_;
310  }
311 
312  std::vector<utils::Optional<Variant>> & operator*()
313  {
314  return variants_;
315  }
316 
323  std::vector<utils::Optional<Variant>> const& variants() const
324  {
325  return variants_;
326  }
327 
331  std::vector<utils::Optional<Variant>>& variants()
332  {
333  return variants_;
334  }
335 
343  std::vector<VariantInputStream> const& inputs() const
344  {
345  // We assume that the user only does this when the iterator is not an end() iterator.
346  assert( parent_ );
347  return parent_->inputs_;
348  }
349 
357  VariantInputStream const& input_at( size_t index ) const
358  {
359  return parent_->inputs_[index];
360  }
361 
371  utils::Optional<Variant> const& variant_at( size_t index ) const
372  {
373  // Return with boundary check.
374  return variants_.at( index );
375  }
376 
381  {
382  // Return with boundary check.
383  return variants_.at( index );
384  }
385 
397 
401  GenomeLocus const& locus() const
402  {
403  return current_locus_;
404  }
405 
406  // -------------------------------------------------------------------------
407  // Iteration
408  // -------------------------------------------------------------------------
409 
411  {
412  advance_();
413  return *this;
414  }
415 
417  {
418  auto cpy = *this;
419  advance_();
420  return cpy;
421  }
422 
423  operator bool() const
424  {
425  return parent_ != nullptr;
426  }
427 
436  bool operator==( self_type const& it ) const
437  {
438  return parent_ == it.parent_;
439  }
440 
441  bool operator!=( self_type const& it ) const
442  {
443  return !(*this == it);
444  }
445 
446  // -------------------------------------------------------------------------
447  // Internal Members
448  // -------------------------------------------------------------------------
449 
450  private:
451 
455  void advance_()
456  {
457  // Some basic checks.
458  assert( parent_ );
459  assert( parent_->inputs_.size() == parent_->selections_.size() );
460  assert( parent_->inputs_.size() == iterators_.size() );
461 
462  // Depending on what type of inputs we have, we need two different algorithms
463  // to find the next position to iterate to.
464  if( parent_->has_carrying_input_ ) {
465  advance_using_carrying_();
466  } else {
467  advance_using_only_following_();
468  }
469  }
470 
474  void advance_using_carrying_();
475 
482  void advance_using_only_following_();
483 
488  void increment_iterator_( VariantInputStream::Iterator& iterator );
489 
494  void assert_correct_chr_and_pos_( VariantInputStream::Iterator const& iterator );
495 
501  void update_variants_();
502 
503  private:
504 
505  // Parent
506  VariantParallelInputStream* parent_ = nullptr;
507 
508  // Keep track of the locus that the iterator currently is at.
509  // Not all sources have to be there (if they don't have data for that locus), in which case
510  // we want them to be at the next position in their data beyond the current locus.
511  GenomeLocus current_locus_;
512 
513  // Keep the iterators that we want to traverse. We only need the begin() iterators,
514  // as they are themselves able to tell us if they are still good (via their operator bool).
515  std::vector<VariantInputStream::Iterator> iterators_;
516 
517  // We need to store how many samples (SampleCounts objects) the Variant of each iterator has,
518  // in order to fill in the empty ones at the iterator positions where they don't have data.
519  // We cannot always look that up from the iterators themselves, as they might already have
520  // reached their end of the data while others are still having data, so we store it here.
521  std::vector<size_t> variant_sizes_;
522  size_t variant_size_sum_;
523 
524  // Storage for the variants of the iterators. We need these copies, as not all iterators
525  // are expected to have all loci in the genome, so if we'd instead gave access to the
526  // iterators directly to the user of this class, they'd have to check if the iterator is at
527  // the correct locus, and so on. So instead, we offer a user-friendly interface that they
528  // can simply iterator over and check if the optional is empty or not. Bit of copying,
529  // but then again, each layer of abstraction comes at some cost...
530  // At least, we move (not copy) data into here, for efficiency.
531  std::vector<utils::Optional<Variant>> variants_;
532 
533  // Store the current additional carrying locus that we are at (if those have been
534  // added; if not, we just store the end iterator here).
535  std::set<GenomeLocus>::const_iterator carrying_locus_it_;
536 
537  };
538 
539  // ======================================================================================
540  // Main Class
541  // ======================================================================================
542 
543  // -------------------------------------------------------------------------
544  // Constructors and Rule of Five
545  // -------------------------------------------------------------------------
546 
547  VariantParallelInputStream() = default;
548  ~VariantParallelInputStream() = default;
549 
550  VariantParallelInputStream( self_type const& ) = default;
551  VariantParallelInputStream( self_type&& ) = default;
552 
553  self_type& operator= ( self_type const& ) = default;
554  self_type& operator= ( self_type&& ) = default;
555 
556  friend Iterator;
557 
558  // -------------------------------------------------------------------------
559  // Iteration
560  // -------------------------------------------------------------------------
561 
569  {
570  if( started_ ) {
571  throw std::runtime_error(
572  "VariantParallelInputStream implements an input iterator (single pass); "
573  "begin() can hence not be called multiple times."
574  );
575  }
576  started_ = true;
577  return Iterator( this );
578  }
579 
584  {
585  return Iterator( nullptr );
586  }
587 
588  // -------------------------------------------------------------------------
589  // Input Sources
590  // -------------------------------------------------------------------------
591 
596  VariantInputStream const& input,
597  ContributionType selection
598  ) {
599  if( started_ ) {
600  throw std::runtime_error(
601  "VariantParallelInputStream::add_variant_input_stream() cannot be called "
602  "once the iteration has been started with begin()."
603  );
604  }
605 
606  inputs_.emplace_back( input );
607  selections_.emplace_back( selection );
608  assert( inputs_.size() == selections_.size() );
609 
610  if( selection == ContributionType::kCarrying ) {
611  has_carrying_input_ = true;
612  }
613  return *this;
614  }
615 
623  std::function<bool(Variant&)> input_element_generator,
624  ContributionType selection
625  ) {
626  add_variant_input_stream( VariantInputStream( input_element_generator ), selection );
627  return *this;
628  }
629 
633  std::vector<VariantInputStream> const& inputs() const
634  {
635  return inputs_;
636  }
637 
648  std::vector<VariantInputStream>& inputs()
649  {
650  return inputs_;
651  }
652 
656  VariantInputStream const& input_at( size_t index ) const
657  {
658  return inputs_[index];
659  }
660 
664  VariantInputStream& input_at( size_t index )
665  {
666  return inputs_[index];
667  }
668 
672  size_t input_size() const
673  {
674  assert( inputs_.size() == selections_.size() );
675  return inputs_.size();
676  }
677 
678  // -------------------------------------------------------------------------
679  // Input Loci
680  // -------------------------------------------------------------------------
681 
704  {
705  if( started_ ) {
706  throw std::runtime_error(
707  "VariantParallelInputStream::add_carrying_locus() cannot be called "
708  "once the iteration has been started with begin()."
709  );
710  }
711 
712  // Error check.
713  if( locus.chromosome.empty() || locus.position == 0 ) {
714  throw std::invalid_argument(
715  "Cannot add a carrying locus with empty chromosome or position 0 "
716  "to VariantParallelInputStream"
717  );
718  }
719 
720  // Add to the list. Also, if loci are added with this function, these serve as carrying loci,
721  // and so we can always use advance_using_carrying_() to find the next locus;
722  // mark this by setting has_carrying_input_.
723  carrying_loci_.insert( locus );
724  has_carrying_input_ = true;
725  return *this;
726  }
727 
734  self_type& add_carrying_loci( std::vector<GenomeLocus> const& loci )
735  {
736  add_carrying_loci( loci.begin(), loci.end() );
737  return *this;
738  }
739 
743  template<class ForwardIterator>
744  self_type& add_carrying_loci( ForwardIterator first, ForwardIterator last )
745  {
746  while( first != last ) {
747  add_carrying_locus( *first );
748  ++first;
749  }
750 
751  // Version for if we wanted to switch the set for a vector.
752  // Sort the list of loci. All this is so inefficient, as we store the chromosome names
753  // again and again for each locus. The sorting is okay though, we need to have that
754  // complexity somewhere - using a std::set for example would just shift the place where
755  // we do the sorting, but would make iteration a bit more tricky, and would need even more
756  // memory.
757  // std::sort( carrying_loci_.begin(), carrying_loci_.end() );
758  // carrying_loci_.erase(
759  // std::unique( carrying_loci_.begin(), carrying_loci_.end() ),
760  // carrying_loci_.end()
761  // );
762 
763  return *this;
764  }
765 
766  // -------------------------------------------------------------------------
767  // Sequence Dict
768  // -------------------------------------------------------------------------
769 
773  std::shared_ptr<genesis::sequence::SequenceDict> sequence_dict() const
774  {
775  return sequence_dict_;
776  }
777 
792  self_type& sequence_dict( std::shared_ptr<genesis::sequence::SequenceDict> value )
793  {
794  if( started_ ) {
795  throw std::runtime_error(
796  "VariantParallelInputStream::sequence_dict() cannot be called "
797  "once the iteration has been started with begin()."
798  );
799  }
800  sequence_dict_ = value;
801  return *this;
802  }
803 
804  // -------------------------------------------------------------------------
805  // Data Members
806  // -------------------------------------------------------------------------
807 
808 private:
809 
810  // Store all input sources, as well as the type (carrying or following) of how we want
811  // to traverse them. We keep track whether at least one of them is of type carrying.
812  // If not (all following), the advance function of the iterator needs to be special.
813  std::vector<VariantInputStream> inputs_;
814  std::vector<ContributionType> selections_;
815  bool has_carrying_input_ = false;
816  bool started_ = false;
817 
818  // Store all additional loci that we want to include as stops in the iterator.
819  // Memory-wise, this is highly inefficient, as we store the chromosome name for each of them.
820  // But for now, this is easiest and fastest. We use a set, so that adding loci one after another
821  // always results in a sorted container, without having to re-sort every time.
822  // This again has a bit of a higher memory impact, but that should be okay for now.
823  std::set<GenomeLocus> carrying_loci_;
824 
825  // Keep a sequence dictionary for the order of chromosomes. If not provided, we assume
826  // chromosomes are sorted lexicographically. If provided however, the order as specified
827  // by the dictionary is expected of the input sources instead.
828  std::shared_ptr<genesis::sequence::SequenceDict> sequence_dict_;
829 
830 };
831 
832 } // namespace population
833 } // namespace genesis
834 
835 #endif // include guard
genesis::population::VariantParallelInputStream::Iterator::operator*
std::vector< utils::Optional< Variant > > & operator*()
Definition: variant_parallel_input_stream.hpp:312
genesis::population::VariantParallelInputStream::Iterator::operator++
self_type & operator++()
Definition: variant_parallel_input_stream.hpp:410
genesis::population::VariantParallelInputStream::sequence_dict
std::shared_ptr< genesis::sequence::SequenceDict > sequence_dict() const
Get the currently set sequence dictionary used for the chromosome sorting order.
Definition: variant_parallel_input_stream.hpp:773
genesis::population::VariantParallelInputStream::Iterator::reference
value_type const & reference
Definition: variant_parallel_input_stream.hpp:270
genesis::population::VariantParallelInputStream::input_at
VariantInputStream & input_at(size_t index)
Get access to an input stream that has been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:664
genesis::population::VariantParallelInputStream::JoinedVariantParams::allow_ref_base_mismatches
bool allow_ref_base_mismatches
Definition: variant_parallel_input_stream.hpp:238
genome_locus.hpp
genesis::population::VariantParallelInputStream::Iterator::Iterator
Iterator()=default
genesis::population::VariantParallelInputStream::sequence_dict
self_type & sequence_dict(std::shared_ptr< genesis::sequence::SequenceDict > value)
Set a sequence dictionary to be used for the chromosome order.
Definition: variant_parallel_input_stream.hpp:792
genesis::population::VariantParallelInputStream::JoinedVariantParams::move_samples
bool move_samples
Definition: variant_parallel_input_stream.hpp:241
genesis::population::VariantParallelInputStream::Iterator::variant_at
utils::Optional< Variant > const & variant_at(size_t index) const
Return the data of the input streams at the given index at the current locus.
Definition: variant_parallel_input_stream.hpp:371
genesis::population::VariantParallelInputStream::Iterator::variants
std::vector< utils::Optional< Variant > > const & variants() const
Return the data of all input streams at the current locus.
Definition: variant_parallel_input_stream.hpp:323
genesis::population::VariantParallelInputStream::~VariantParallelInputStream
~VariantParallelInputStream()=default
genesis::population::VariantParallelInputStream::Iterator::value_type
std::vector< utils::Optional< Variant > > value_type
Definition: variant_parallel_input_stream.hpp:268
genesis::population::VariantParallelInputStream::Iterator::operator!=
bool operator!=(self_type const &it) const
Definition: variant_parallel_input_stream.hpp:441
genesis::population::GenomeLocus::position
size_t position
Definition: genome_locus.hpp:67
genome_locus.hpp
genesis::population::VariantParallelInputStream::Iterator::operator->
const std::vector< utils::Optional< Variant > > * operator->() const
Definition: variant_parallel_input_stream.hpp:297
genesis::population::VariantParallelInputStream::inputs
std::vector< VariantInputStream > const & inputs() const
Get access to the input streams that have been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:633
genesis::population::VariantParallelInputStream::JoinedVariantParams
Parameters for VariantParallelInputStream::Iterator::joined_variant()
Definition: variant_parallel_input_stream.hpp:231
genesis::population::GenomeLocus
A single locus, that is, a position (or coordinate) on a chromosome.
Definition: genome_locus.hpp:64
genesis::population::VariantParallelInputStream::Iterator::VariantParallelInputStream
friend VariantParallelInputStream
Definition: variant_parallel_input_stream.hpp:291
genesis::population::VariantParallelInputStream::Iterator::operator==
bool operator==(self_type const &it) const
Compare two iterators for equality.
Definition: variant_parallel_input_stream.hpp:436
genesis::population::VariantParallelInputStream::ContributionType
ContributionType
Select which loci of an input are used.
Definition: variant_parallel_input_stream.hpp:183
genesis::population::VariantParallelInputStream::add_carrying_loci
self_type & add_carrying_loci(std::vector< GenomeLocus > const &loci)
Add a set of GenomeLoci that are used as carrying loci in the iteration.
Definition: variant_parallel_input_stream.hpp:734
genesis::population::GenomeLocus::chromosome
std::string chromosome
Definition: genome_locus.hpp:66
genesis::utils::Optional
Simplistic optional: requires T to be default constructible, copyable.
Definition: optional.hpp:178
genesis::population::VariantParallelInputStream::Iterator::locus
GenomeLocus const & locus() const
Return the current locus where the iteration is at.
Definition: variant_parallel_input_stream.hpp:401
genesis::population::VariantParallelInputStream
Iterate multiple input sources that yield Variants in parallel.
Definition: variant_parallel_input_stream.hpp:145
genesis::population::VariantParallelInputStream::Iterator::variant_at
utils::Optional< Variant > & variant_at(size_t index)
Return the data of the input streams at the given index at the current locus.
Definition: variant_parallel_input_stream.hpp:380
genesis::population::VariantParallelInputStream::Iterator
friend Iterator
Definition: variant_parallel_input_stream.hpp:556
genesis::population::VariantParallelInputStream::input_size
size_t input_size() const
Return the number of input sourced added.
Definition: variant_parallel_input_stream.hpp:672
sample_counts.hpp
genesis::population::VariantParallelInputStream::Iterator::operator=
Iterator & operator=(self_type const &)=default
genesis::population::VariantParallelInputStream::Iterator::input_at
VariantInputStream const & input_at(size_t index) const
Get access to an input stream that has been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:357
genesis::population::VariantParallelInputStream::begin
Iterator begin()
Begin the iteration.
Definition: variant_parallel_input_stream.hpp:568
sequence_dict.hpp
genesis::population::VariantParallelInputStream::input_at
VariantInputStream const & input_at(size_t index) const
Get access to an input stream that has been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:656
genesis::population::VariantParallelInputStream::Iterator::inputs
std::vector< VariantInputStream > const & inputs() const
Get access to the input streams that have been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:343
genesis::population::VariantParallelInputStream::Iterator::operator*
const std::vector< utils::Optional< Variant > > & operator*() const
Definition: variant_parallel_input_stream.hpp:307
genesis::population::Variant
A single variant at a position in a chromosome, along with SampleCounts for a set of samples.
Definition: variant.hpp:65
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::population::VariantParallelInputStream::Iterator::joined_variant
Variant joined_variant(JoinedVariantParams const &params=JoinedVariantParams{})
Create a single Variant instance that combines all Variants from the input sources at the current loc...
Definition: variant_parallel_input_stream.cpp:142
genesis::population::VariantParallelInputStream::Iterator::~Iterator
~Iterator()=default
genesis::population::VariantParallelInputStream::Iterator
Iterator over loci of the input sources.
Definition: variant_parallel_input_stream.hpp:259
genesis::population::VariantParallelInputStream::Iterator::variants
std::vector< utils::Optional< Variant > > & variants()
Return the data of all input streams at the current locus.
Definition: variant_parallel_input_stream.hpp:331
genesis::population::VariantParallelInputStream::operator=
self_type & operator=(self_type const &)=default
genesis::population::VariantParallelInputStream::add_carrying_locus
self_type & add_carrying_locus(GenomeLocus const &locus)
Add a set of GenomeLoci that are used as carrying loci in the iteration.
Definition: variant_parallel_input_stream.hpp:703
variant_input_stream.hpp
genesis::population::VariantInputStream
utils::GenericInputStream< Variant, VariantInputStreamData > VariantInputStream
Iterate Variants, using a variety of input file formats.
Definition: stream/variant_input_stream.hpp:108
genesis::population::VariantParallelInputStream::VariantParallelInputStream
VariantParallelInputStream()=default
genesis::population::VariantParallelInputStream::self_type
VariantParallelInputStream self_type
Definition: variant_parallel_input_stream.hpp:244
genesis::population::VariantParallelInputStream::JoinedVariantParams::treat_non_passing_variants_as_missing
bool treat_non_passing_variants_as_missing
Definition: variant_parallel_input_stream.hpp:240
variant.hpp
genesis::population::VariantParallelInputStream::add_variant_input_stream
self_type & add_variant_input_stream(VariantInputStream const &input, ContributionType selection)
Add an input to the parallel stream.
Definition: variant_parallel_input_stream.hpp:595
genesis::population::VariantParallelInputStream::Iterator::iterator_category
std::input_iterator_tag iterator_category
Definition: variant_parallel_input_stream.hpp:271
genesis::population::VariantParallelInputStream::JoinedVariantParams::allow_alt_base_mismatches
bool allow_alt_base_mismatches
Definition: variant_parallel_input_stream.hpp:239
genesis::population::VariantParallelInputStream::inputs
std::vector< VariantInputStream > & inputs()
Get access to the input streams that have been added to this parallel stream.
Definition: variant_parallel_input_stream.hpp:648
genesis::population::VariantParallelInputStream::ContributionType::kFollowing
@ kFollowing
For a given input, only stop at positions where other inputs also want to stop.
genesis::population::VariantParallelInputStream::add_variant_input
self_type & add_variant_input(std::function< bool(Variant &)> input_element_generator, ContributionType selection)
Add an input to the parallel stream.
Definition: variant_parallel_input_stream.hpp:622
genesis::population::VariantParallelInputStream::Iterator::operator->
std::vector< utils::Optional< Variant > > * operator->()
Definition: variant_parallel_input_stream.hpp:302
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >
genesis::population::VariantParallelInputStream::JoinedVariantParams::JoinedVariantParams
JoinedVariantParams()
Definition: variant_parallel_input_stream.hpp:235
genesis::utils::GenericInputStream< Variant, VariantInputStreamData >::Iterator
friend Iterator
Definition: generic_input_stream.hpp:846
genesis::population::VariantParallelInputStream::end
Iterator end()
End marker for the iteration.
Definition: variant_parallel_input_stream.hpp:583
genesis::population::VariantParallelInputStream::Iterator::pointer
value_type const * pointer
Definition: variant_parallel_input_stream.hpp:269
genesis::population::VariantParallelInputStream::add_carrying_loci
self_type & add_carrying_loci(ForwardIterator first, ForwardIterator last)
Add a set of GenomeLoci that are used as carrying loci in the iteration.
Definition: variant_parallel_input_stream.hpp:744
genesis::population::VariantParallelInputStream::ContributionType::kCarrying
@ kCarrying
For a given input, stop at all its positions.
optional.hpp