A library for working with phylogenetic and population genetic data.
v0.32.0
variant_input_stream_adapters.cpp
Go to the documentation of this file.
1 /*
2  Genesis - A toolkit for working with phylogenetic data.
3  Copyright (C) 2014-2024 Lucas Czech
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  Contact:
19  Lucas Czech <lucas.czech@sund.ku.dk>
20  University of Copenhagen, Globe Institute, Section for GeoGenetics
21  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
22 */
23 
32 
39 
40 #include <algorithm>
41 #include <cassert>
42 #include <cstdint>
43 #include <memory>
44 #include <stdexcept>
45 #include <unordered_set>
46 #include <utility>
47 
48 namespace genesis {
49 namespace population {
50 
51 // =================================================================================================
52 // Variant Parallel Input Stream
53 // =================================================================================================
54 
56  VariantParallelInputStream const& parallel_input,
57  VariantParallelInputStream::JoinedVariantParams const& joined_variant_params
58 ) {
59  // As before, make a shared pointer (with a copy of the input) that stays alive.
60  auto input = std::make_shared<VariantParallelInputStream>( parallel_input );
61 
62  // Get the iterators.
65  bool has_started = false;
66 
67  // We do not have a single file here, so make a list of all sample names from the inputs.
68  // Leave file_path and source_name at their empty defaults.
70  std::unordered_set<std::string> uniq_names;
71  for( auto const& source : input->inputs() ) {
72  // auto const& source_name = source.data().source_name;
73  // for( auto const& sample_name : source.data().sample_names ) {
74  // data.sample_names.push_back( source_name + source_sample_separator + sample_name );
75  // }
76 
77  for( auto const& sample_name : source.data().sample_names ) {
78  if( uniq_names.count( sample_name ) > 0 ) {
79  throw std::runtime_error(
80  "Cannot iterate input sources in parallel, as sample name \"" + sample_name +
81  "\" occurs multiple times in the inputs."
82  );
83  }
84  uniq_names.insert( sample_name );
85  data.sample_names.push_back( sample_name );
86  }
87  }
88  assert( uniq_names.size() == data.sample_names.size() );
89 
90  // Set the params as needed. We want to move samples here, as we know we are not going to use
91  // them any more in this function!
92  auto params = joined_variant_params;
93  params.move_samples = true;
94 
95  // The input is copied over to the lambda, and that copy is kept alive.
96  return VariantInputStream(
97  [ input, cur, end, has_started, params ]
98  ( Variant& variant ) mutable {
99  if( ! has_started ) {
100  assert( input );
101  cur = input->begin();
102  end = input->end();
103  has_started = true;
104  }
105  if( cur != end ) {
106  variant = cur.joined_variant( params );
107  ++cur;
108  return true;
109  } else {
110  return false;
111  }
112  },
113  std::move( data )
114  );
115 }
116 
117 // =================================================================================================
118 // Variant Gapless Input Stream
119 // =================================================================================================
120 
122  VariantGaplessInputStream const& gapless_input
123 ) {
124  // As before, make a shared pointer (with a copy of the input) that stays alive.
125  auto input = std::make_shared<VariantGaplessInputStream>( gapless_input );
126 
127  // Get the iterators.
130  bool has_started = false;
131 
132  // The input is copied over to the lambda, and that copy is kept alive.
133  // The VariantInputStreamData is simply copied over.
134  return VariantInputStream(
135  [ input, cur, end, has_started ]
136  ( Variant& variant ) mutable {
137  if( ! has_started ) {
138  assert( input );
139  cur = input->begin();
140  end = input->end();
141  has_started = true;
142  }
143  if( cur != end ) {
144  variant = std::move( *cur );
145  ++cur;
146  return true;
147  } else {
148  return false;
149  }
150  },
151  input->input().data()
152  );
153 }
154 
155 // =================================================================================================
156 // Merging Input Stream
157 // =================================================================================================
158 
162 struct VariantMergeGroupAssignment
163 {
167  std::vector<size_t> group_assignments;
168 
174  std::vector<std::string> group_names;
175 };
176 
181  VariantInputStream const& variant_input,
182  std::unordered_map<std::string, std::string> const& sample_name_to_group,
183  bool allow_ungrouped_samples
184 ) {
185  // Shorthands and checks
186  auto const& sample_names = variant_input.data().sample_names;
187  if( sample_names.size() == 0 ) {
188  throw std::runtime_error( "Cannot merge sample groups if no sample names are provided" );
189  }
190 
191  // Make a vector assinging sample indices to group indices.
192  // We also keep track of the group names (and their indices) and samples names we have processed.
193  VariantMergeGroupAssignment grouping;
194  grouping.group_assignments = std::vector<size_t>( sample_names.size() );
195  std::unordered_map<std::string, size_t> group_to_index;
196  std::unordered_set<std::string> uniq_sample_names;
197 
198  // Do the assignment
199  for( size_t i = 0; i < sample_names.size(); ++i ) {
200  auto const& sample_name = sample_names[i];
201  if( sample_name.empty() ) {
202  throw std::runtime_error( "Cannot merge sample groups with empty sample names." );
203  }
204 
205  // Check uniqueness of names. We also use that set later, so it's okay to have these
206  // extra copies or the names around.
207  if( uniq_sample_names.count( sample_name ) > 0 ) {
208  throw std::runtime_error(
209  "Cannot merge sample groups with duplicate sample names. Sample name \"" +
210  sample_name + "\" occurs multiple times in the input."
211  );
212  }
213  uniq_sample_names.insert( sample_name );
214 
215  // Get the group name for the sample. If it's in the given list, we use that.
216  // If not, we either fail, or use the sample name as the group name.
217  // In the latter case, there is some scenario where some provided sample names is also
218  // a group name - that could be a bit chaotic and have weird consequences, but should
219  // work, and so we just leave that as a user error and do not check for this here.
220  std::string group_name;
221  if( sample_name_to_group.count( sample_name ) > 0 ) {
222  group_name = sample_name_to_group.at( sample_name );
223  if( group_name.empty() ) {
224  throw std::runtime_error(
225  "Cannot merge sample groups, as sample name \"" + sample_name + "\" has an " +
226  "empty group name assigned in the provided mapping of sample names to group names."
227  );
228  }
229  } else if( allow_ungrouped_samples ) {
230  group_name = sample_name;
231  } else {
232  throw std::runtime_error(
233  "Cannot merge sample groups, as sample name \"" + sample_name +
234  "\" does not occur in the provided mapping of sample names to group names."
235  );
236  }
237  assert( !group_name.empty() );
238 
239  // Now we have a group name. Check if we already have an index for this group,
240  // or whether this is a new group, so that we need to give it the next index.
241  // Then assign this as the group index for the sample.
242  if( group_to_index.count( group_name ) == 0 ) {
243  auto const next_idx = group_to_index.size();
244  group_to_index[ group_name ] = next_idx;
245  grouping.group_names.push_back( group_name );
246  }
247  auto const group_idx = group_to_index.at( group_name );
248  assert( group_idx < group_to_index.size() );
249  assert( group_idx < grouping.group_names.size() );
250  grouping.group_assignments[i] = group_idx;
251  }
252  assert( grouping.group_names.size() > 0 );
253  assert( grouping.group_names.size() == group_to_index.size() );
254  assert( uniq_sample_names.size() == sample_names.size() );
255 
256  // Finally, we warn about any names that have an assignment to a group, but did not appear.
257  // Might indicate that something is wrong, but might also just mean that the user has some
258  // larger mapping that does not always contain all names. Either way, better to tell.
259  std::unordered_set<std::string> samples_names_to_warn;
260  for( auto const& ng : sample_name_to_group ) {
261  if( uniq_sample_names.count( ng.first ) == 0 ) {
262  samples_names_to_warn.insert( ng.first );
263  }
264  }
265  if( !samples_names_to_warn.empty() ) {
266  LOG_WARN << "In the provided list of samples to merge into groups, there were "
267  << std::to_string( samples_names_to_warn.size() )
268  << " sample names that did not occur in the input sample names:\n"
269  << " - " << genesis::utils::join( samples_names_to_warn, " - " )
270  ;
271  }
272 
273  return grouping;
274 }
275 
277  VariantInputStream const& variant_input,
278  std::unordered_map<std::string, std::string> const& sample_name_to_group,
279  bool allow_ungrouped_samples,
280  SampleCountsFilterPolicy filter_policy
281 ) {
282 
283  // Make a mapping from sample indices to group indices. That is, each entry in the returned
284  // vector corresponds to a sample (by its index) of the input, and the value at each entry
285  // is the group number we assign to this.
287  variant_input, sample_name_to_group, allow_ungrouped_samples
288  );
289 
290  // As before, make a shared pointer (with a copy of the input) that stays alive.
291  auto input = std::make_shared<VariantInputStream>( variant_input );
292 
293  // Get iterators to the data.
296  bool has_started = false;
297 
298  // We copy the original variant data, but replace the sample names by our group names.
299  auto data = variant_input.data();
300  data.sample_names = grouping.group_names;
301 
302  // Now we can create our iterator. As before, the iterators are copied over to the lambda,
303  // and those copies are kept alive when returning from this function.
304  return VariantInputStream(
305  [ input, cur, end, has_started, grouping, filter_policy ]( Variant& variant ) mutable {
306  if( ! has_started ) {
307  assert( input );
308  cur = input->begin();
309  end = input->end();
310  has_started = true;
311  }
312 
313  // Nothing to do if we are at the end. Indicate that to the caller.
314  if( cur == end ) {
315  return false;
316  }
317 
318  // For efficiency, we do not want to make a full copy of the input variant,
319  // as that would entail an unncessary copy of the full samples vector.
320  // However, we do want a copy of all other members of Variant, and listing them here
321  // is a bit tedious. So instead we trick around a bit: We move out the samples,
322  // then copy the rest, and then move the samples back. From the point of view of
323  // the underlying input, that should appear as if nothing happened.
324  // This also makes sure that any late additions to the Variant, such as the filter
325  // status are correclty copied over.
326  auto& cur_var = *cur;
327  auto const sample_count = cur_var.samples.size();
328  auto tmp_samples = std::move( cur_var.samples );
329  variant = cur_var;
330  cur_var.samples = std::move( tmp_samples );
331  assert( cur_var.samples.size() == sample_count );
332 
333  // Consistency check the number of samples in the input.
334  if( sample_count != grouping.group_assignments.size() ) {
335  throw std::runtime_error(
336  "Based on sample names and groups, " +
337  std::to_string( grouping.group_assignments.size() ) +
338  " samples are expected to be found in the input, but " +
339  std::to_string( sample_count ) + " samples were found at " +
340  cur_var.chromosome + ":" + std::to_string( cur_var.position )
341  );
342  }
343 
344  // Now we create a new samples vector (no way around that), with SampleCounts instances
345  // that are all initialized to 0 at all counts, and merge the samples into it,
346  // as given by our group assignment.
347  variant.samples = std::vector<SampleCounts>( grouping.group_names.size() );
348  for( size_t i = 0; i < sample_count; ++i ) {
349  auto const group_idx = grouping.group_assignments[i];
350 
351  // Validity check. Pretty sure that this could just be an assertion, as this
352  // is all internal and not dependent on user input, but seems important enough
353  // to just always check.
354  if( group_idx >= variant.samples.size() ) {
355  throw std::runtime_error(
356  "Invalid group index " + std::to_string( group_idx ) +
357  " in Variant with " + std::to_string( variant.samples.size() ) +
358  " samples."
359  );
360  }
361 
362  // Merge the sample, but only if we do not want to skip it due to it being filtered.
363  if(
364  filter_policy == SampleCountsFilterPolicy::kOnlyPassing &&
365  ! cur_var.samples[i].status.passing()
366  ) {
367  continue;
368  }
369  merge_inplace( variant.samples[group_idx], cur_var.samples[i] );
370  }
371 
372  // We are done, move to the next position in the input,
373  // and signal to the caller that we found data.
374  ++cur;
375  return true;
376  },
377  std::move( data )
378  );
379 }
380 
381 } // namespace population
382 } // namespace genesis
genesis::population::make_variant_input_stream_from_variant_parallel_input_stream
VariantInputStream make_variant_input_stream_from_variant_parallel_input_stream(VariantParallelInputStream const &parallel_input, VariantParallelInputStream::JoinedVariantParams const &joined_variant_params)
Create a VariantInputStream to iterate multiple input sources at once, using a VariantParallelInputSt...
Definition: variant_input_stream_adapters.cpp:55
genesis::population::make_variant_merging_input_stream
VariantInputStream make_variant_merging_input_stream(VariantInputStream const &variant_input, std::unordered_map< std::string, std::string > const &sample_name_to_group, bool allow_ungrouped_samples, SampleCountsFilterPolicy filter_policy)
Create a VariantInputStream that merges samples from its underlying input.
Definition: variant_input_stream_adapters.cpp:276
genesis::population::merge_inplace
void merge_inplace(SampleCounts &p1, SampleCounts const &p2)
Merge the counts of two SampleCountss, by adding the counts of the second (p2) to the first (p1).
Definition: population/function/functions.cpp:383
functions.hpp
genesis::population::VariantParallelInputStream::JoinedVariantParams::move_samples
bool move_samples
Definition: variant_parallel_input_stream.hpp:241
genesis::utils::GenericInputStream::data
Data const & data() const
Access the data stored in the iterator.
Definition: generic_input_stream.hpp:878
fs.hpp
Provides functions for accessing the file system.
variant_input_stream_adapters.hpp
genesis::population::VariantParallelInputStream::JoinedVariantParams
Parameters for VariantParallelInputStream::Iterator::joined_variant()
Definition: variant_parallel_input_stream.hpp:231
genesis::population::make_variant_merging_input_stream_group_assignment_
VariantMergeGroupAssignment make_variant_merging_input_stream_group_assignment_(VariantInputStream const &variant_input, std::unordered_map< std::string, std::string > const &sample_name_to_group, bool allow_ungrouped_samples)
Helper function to create a mapping from sample indices to group indices.
Definition: variant_input_stream_adapters.cpp:180
genesis::population::to_string
std::string to_string(GenomeLocus const &locus)
Definition: function/genome_locus.hpp:52
genesis::population::VariantParallelInputStream
Iterate multiple input sources that yield Variants in parallel.
Definition: variant_parallel_input_stream.hpp:145
genesis::population::SampleCountsFilterPolicy
SampleCountsFilterPolicy
Policy helper to decide how to treat filtered SampleCounts.
Definition: sample_counts_filter.hpp:211
sample_counts_filter.hpp
string.hpp
Provides some commonly used string utility functions.
LOG_WARN
#define LOG_WARN
Log a warning. See genesis::utils::LoggingLevel.
Definition: logging.hpp:97
genesis::population::VariantGaplessInputStream
Stream adapter that visits every position in the genome.
Definition: variant_gapless_input_stream.hpp:94
logging.hpp
Provides easy and fast logging functionality.
genesis::population::Variant
A single variant at a position in a chromosome, along with SampleCounts for a set of samples.
Definition: variant.hpp:65
genesis::utils::join
Interval< DataType, NumericalType, IntervalKind > join(Interval< DataType, NumericalType, IntervalKind > const &a, Interval< DataType, NumericalType, IntervalKind > const &b)
Creates a new Interval that contains both intervals and whatever is between.
Definition: utils/containers/interval_tree/functions.hpp:127
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::GenericInputStream::Iterator
Internal iterator over the data.
Definition: generic_input_stream.hpp:196
genesis::population::VariantParallelInputStream::Iterator::joined_variant
Variant joined_variant(JoinedVariantParams const &params=JoinedVariantParams{})
Create a single Variant instance that combines all Variants from the input sources at the current loc...
Definition: variant_parallel_input_stream.cpp:142
genesis::population::VariantParallelInputStream::Iterator
Iterator over loci of the input sources.
Definition: variant_parallel_input_stream.hpp:259
genesis::population::VariantInputStreamData
Data storage for input-specific information when traversing a variant file.
Definition: stream/variant_input_stream.hpp:61
genesis::population::SampleCountsFilterPolicy::kOnlyPassing
@ kOnlyPassing
variant_filter.hpp
genesis::population::VariantInputStreamData::sample_names
std::vector< std::string > sample_names
Sample names, for example as found in the file header.
Definition: stream/variant_input_stream.hpp:85
genesis::population::VariantInputStream
utils::GenericInputStream< Variant, VariantInputStreamData > VariantInputStream
Iterate Variants, using a variety of input file formats.
Definition: stream/variant_input_stream.hpp:108
genesis::population::VariantGaplessInputStream::Iterator
Iterator over loci of the input source.
Definition: variant_gapless_input_stream.hpp:116
genesis::population::Variant::samples
std::vector< SampleCounts > samples
Definition: variant.hpp:82
genesis::population::make_variant_input_stream_from_variant_gapless_input_stream
VariantInputStream make_variant_input_stream_from_variant_gapless_input_stream(VariantGaplessInputStream const &gapless_input)
Create a VariantInputStream that wraps a VariantGaplessInputStream.
Definition: variant_input_stream_adapters.cpp:121
genesis::utils::GenericInputStream
Type erasure for iterators, using std::function to eliminate the underlying input type.
Definition: generic_input_stream.hpp:163