A library for working with phylogenetic and population genetic data.
v0.32.0
fst_pool_processor.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_POPULATION_FUNCTION_FST_POOL_PROCESSOR_H_
2 #define GENESIS_POPULATION_FUNCTION_FST_POOL_PROCESSOR_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
45 
46 #include <algorithm>
47 #include <cassert>
48 #include <limits>
49 #include <memory>
50 #include <stdexcept>
51 #include <string>
52 #include <utility>
53 #include <tuple>
54 #include <vector>
55 
56 namespace genesis {
57 namespace population {
58 
59 // =================================================================================================
60 // Fst Pool Processor
61 // =================================================================================================
62 
68 {
69 public:
70 
71  // -------------------------------------------------------------------------
72  // Typedefs and Enums
73  // -------------------------------------------------------------------------
74 
75  using PiVectorTuple = std::tuple<std::vector<double>, std::vector<double>, std::vector<double>>;
76 
77  // -------------------------------------------------------------------------
78  // Constructors and Rule of Five
79  // -------------------------------------------------------------------------
80 
89  std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
90  size_t threading_threshold = 4096
91  )
92  : thread_pool_( thread_pool )
93  , threading_threshold_( threading_threshold )
94  {
96  }
97 
98  ~FstPoolProcessor() = default;
99 
100  FstPoolProcessor( FstPoolProcessor const& ) = default;
101  FstPoolProcessor( FstPoolProcessor&& ) = default;
102 
103  FstPoolProcessor& operator= ( FstPoolProcessor const& ) = default;
105 
106  // -------------------------------------------------------------------------
107  // Setup
108  // -------------------------------------------------------------------------
109 
113  std::shared_ptr<utils::ThreadPool> thread_pool() const
114  {
115  return thread_pool_;
116  }
117 
124  FstPoolProcessor& thread_pool( std::shared_ptr<utils::ThreadPool> value )
125  {
126  thread_pool_ = value;
127  return *this;
128  }
129 
130  size_t threading_threshold() const
131  {
132  return threading_threshold_;
133  }
134 
145  {
146  threading_threshold_ = value;
147  return *this;
148  }
149 
154  size_t index_p1, size_t index_p2,
155  std::unique_ptr<BaseFstPoolCalculator> calculator
156  ) {
157  assert( sample_pairs_.size() == calculators_.size() );
158  assert( sample_pairs_.size() == results_.size() );
159  sample_pairs_.push_back({ index_p1, index_p2 });
160  calculators_.push_back( std::move( calculator ));
161  results_.push_back( std::numeric_limits<double>::quiet_NaN() );
162  }
163 
164  // -------------------------------------------------------------------------
165  // Calculator Functions
166  // -------------------------------------------------------------------------
167 
172  size_t size() const
173  {
174  assert( calculators_.size() == sample_pairs_.size() );
175  assert( calculators_.size() == results_.size() );
176  return calculators_.size();
177  }
178 
179  void reset()
180  {
181  assert( calculators_.size() == results_.size() );
182  for( auto& calc : calculators_ ) {
183  assert( calc );
184  calc->reset();
185  }
186  filter_stats_.clear();
187  std::fill(
188  results_.begin(), results_.end(),
189  std::numeric_limits<double>::quiet_NaN()
190  );
191 
192  // Also reset the pi vectors to nan.
193  // If they are not allocated, nothing happens.
194  auto const res_sz = results_.size();
195  (void) res_sz;
196  assert( std::get<0>( results_pi_ ).size() == 0 || std::get<0>( results_pi_ ).size() == res_sz );
197  assert( std::get<1>( results_pi_ ).size() == 0 || std::get<1>( results_pi_ ).size() == res_sz );
198  assert( std::get<2>( results_pi_ ).size() == 0 || std::get<2>( results_pi_ ).size() == res_sz );
199  std::fill(
200  std::get<0>( results_pi_ ).begin(), std::get<0>( results_pi_ ).end(),
201  std::numeric_limits<double>::quiet_NaN()
202  );
203  std::fill(
204  std::get<1>( results_pi_ ).begin(), std::get<1>( results_pi_ ).end(),
205  std::numeric_limits<double>::quiet_NaN()
206  );
207  std::fill(
208  std::get<2>( results_pi_ ).begin(), std::get<2>( results_pi_ ).end(),
209  std::numeric_limits<double>::quiet_NaN()
210  );
211  }
212 
213  void process( Variant const& variant )
214  {
215  // Check correct usage
216  assert( sample_pairs_.size() == calculators_.size() );
217 
218  // Only process Variants that are passing, but keep track of the ones that did not.
219  ++filter_stats_[variant.status.get()];
220  if( ! variant.status.passing() ) {
221  return;
222  }
223 
224  // Helper function for the processing.
225  auto process_ = [&]( size_t index ) {
226  auto const& indices = sample_pairs_[index];
227  if(
228  indices.first >= variant.samples.size() ||
229  indices.second >= variant.samples.size()
230  ) {
231  throw std::runtime_error(
232  "Invalid sample indices for computing FST Pool: Variant contains " +
233  std::to_string( variant.samples.size() ) + " samples, but indices " +
234  std::to_string( indices.first ) + " and " + std::to_string( indices.second ) +
235  " have been requested."
236  );
237  }
238  calculators_[index]->process(
239  variant.samples[indices.first],
240  variant.samples[indices.second]
241  );
242  };
243 
244  // Switch dynamically between threading and no threading for the processing.
245  if( thread_pool_ && calculators_.size() >= threading_threshold_ ) {
246  parallel_for(
247  0, sample_pairs_.size(), process_, thread_pool_
248  );
249  } else {
250  for( size_t i = 0; i < sample_pairs_.size(); ++i ) {
251  process_(i);
252  }
253  }
254  }
255 
265  template<class D>
266  std::vector<double> const& get_result(
267  BaseWindow<D> const& window,
268  std::shared_ptr<GenomeLocusSet> provided_loci
269  ) const {
270  assert( results_.size() == calculators_.size() );
271  for( size_t i = 0; i < results_.size(); ++i ) {
272  // We do an ugly dispatch here to treat the special case of the FstPoolCalculatorUnbiased
273  // class, which needs additional information on the window in order to normalize
274  // the pi values correctly. The Kofler and Karlsson do not need that, and we want to
275  // avoid using dummies in these places. So instead, we just do a dispatch here.
276  // If in the future more calculators are added that need special behaviour,
277  // we might want to redesign this...
278  auto const* raw_calc = calculators_[i].get();
279  auto const* unbiased_calc = dynamic_cast<FstPoolCalculatorUnbiased const*>( raw_calc );
280  if( unbiased_calc ) {
281  results_[i] = unbiased_calc->get_result( window, provided_loci, filter_stats_ );
282  } else {
283  results_[i] = raw_calc->get_result();
284  }
285  }
286  return results_;
287  }
288 
294  std::vector<double> const& get_result() const
295  {
296  assert( results_.size() == calculators_.size() );
297  for( size_t i = 0; i < results_.size(); ++i ) {
298  // No dispatch here as in the above overload. Instead, we just use the result function
299  // that does not use window averaging directly.
300  results_[i] = calculators_[i]->get_result();
301  }
302  return results_;
303  }
304 
316  template<class D>
318  BaseWindow<D> const& window,
319  std::shared_ptr<GenomeLocusSet> provided_loci
320  ) const {
321  allocate_pi_result_vectors_();
322 
323  // Get the pi values from all calculators, assuming that they are of the correct type.
324  for( size_t i = 0; i < calculators_.size(); ++i ) {
325  auto const& raw_calc = calculators_[i];
326  auto const* unbiased_calc = dynamic_cast<FstPoolCalculatorUnbiased const*>( raw_calc.get() );
327  if( ! unbiased_calc ) {
328  throw std::domain_error(
329  "Can only call FstPoolProcessor::get_pi_vectors() "
330  "for calculators of type FstPoolCalculatorUnbiased."
331  );
332  }
333 
334  // We compute the window-averaged values here.
335  // Unfortunately, we need to copy this value-by-value, as we want to return
336  // three independent vectors for user convenienec on the caller's end.
337  auto const pis = unbiased_calc->get_pi_values( window, provided_loci, filter_stats_ );
338  std::get<0>(results_pi_)[i] = pis.pi_within;
339  std::get<1>(results_pi_)[i] = pis.pi_between;
340  std::get<2>(results_pi_)[i] = pis.pi_total;
341  }
342 
343  return results_pi_;
344  }
345 
353  {
354  // Same as above, but using a different get_pi_vectors() overload.
355  allocate_pi_result_vectors_();
356  for( size_t i = 0; i < calculators_.size(); ++i ) {
357  auto const& raw_calc = calculators_[i];
358  auto const* unbiased_calc = dynamic_cast<FstPoolCalculatorUnbiased const*>( raw_calc.get() );
359  if( ! unbiased_calc ) {
360  throw std::domain_error(
361  "Can only call FstPoolProcessor::get_pi_vectors() "
362  "for calculators of type FstPoolCalculatorUnbiased."
363  );
364  }
365  auto const pis = unbiased_calc->get_pi_values();
366  std::get<0>(results_pi_)[i] = pis.pi_within;
367  std::get<1>(results_pi_)[i] = pis.pi_between;
368  std::get<2>(results_pi_)[i] = pis.pi_total;
369  }
370 
371  return results_pi_;
372  }
373 
382  {
383  return filter_stats_;
384  }
385 
386  std::vector<std::pair<size_t, size_t>> const& sample_pairs() const
387  {
388  return sample_pairs_;
389  }
390 
391  std::vector<std::unique_ptr<BaseFstPoolCalculator>> const& calculators() const
392  {
393  return calculators_;
394  }
395 
396  // -------------------------------------------------------------------------
397  // Internal Members
398  // -------------------------------------------------------------------------
399 
400 private:
401 
402  void allocate_pi_result_vectors_() const
403  {
404  // Only allocate when someone first calls this.
405  // Does not do anything afterwards.
406  auto const res_sz = calculators_.size();
407  assert( std::get<0>( results_pi_ ).size() == 0 || std::get<0>( results_pi_ ).size() == res_sz );
408  assert( std::get<1>( results_pi_ ).size() == 0 || std::get<1>( results_pi_ ).size() == res_sz );
409  assert( std::get<2>( results_pi_ ).size() == 0 || std::get<2>( results_pi_ ).size() == res_sz );
410  std::get<0>( results_pi_ ).resize( res_sz );
411  std::get<1>( results_pi_ ).resize( res_sz );
412  std::get<2>( results_pi_ ).resize( res_sz );
413  }
414 
415  // -------------------------------------------------------------------------
416  // Internal Members
417  // -------------------------------------------------------------------------
418 
419 private:
420 
421  // The pairs of sample indices of the variant between which we want to compute FST,
422  // and the processors to use for these computations,
423  std::vector<std::pair<size_t, size_t>> sample_pairs_;
424  std::vector<std::unique_ptr<BaseFstPoolCalculator>> calculators_;
425 
426  // Count how many Variants were processed in this processor,
427  // and how many of them passed or failed the filters.
428  VariantFilterStats filter_stats_;
429 
430  // We keep a mutable cache for the results, to avoid reallocating memory each time.
431  // We do the same of the pi values, but this is only allocated when first called.
432  mutable std::vector<double> results_;
433  mutable PiVectorTuple results_pi_;
434 
435  // Thread pool to run the buffering in the background, and the size
436  // (number of sample pairs) at which we start using the thread pool.
437  std::shared_ptr<utils::ThreadPool> thread_pool_;
438  size_t threading_threshold_ = 0;
439 };
440 
441 // =================================================================================================
442 // Make Fst Processor Helper Functions
443 // =================================================================================================
444 
453 template<class Calculator, typename... Args>
455  std::vector<size_t> const& pool_sizes,
456  Args... args
457 ) {
458  FstPoolProcessor result;
459  for( size_t i = 0; i < pool_sizes.size(); ++i ) {
460  for( size_t j = i + 1; j < pool_sizes.size(); ++j ) {
461  result.add_calculator(
462  i, j,
463  ::genesis::utils::make_unique<Calculator>(
464  pool_sizes[i],
465  pool_sizes[j],
466  args...
467  )
468  );
469  }
470  }
471  return result;
472 }
473 
483 template<class Calculator, typename... Args>
485  std::vector<std::pair<size_t, size_t>> const& sample_pairs,
486  std::vector<size_t> const& pool_sizes,
487  Args... args
488 ) {
489  FstPoolProcessor result;
490  for( auto const& p : sample_pairs ) {
491  if( p.first >= pool_sizes.size() || p.second >= pool_sizes.size() ) {
492  throw std::invalid_argument(
493  "Invalid sample indices for computing FST Pool Kofler: " +
494  std::to_string( pool_sizes.size() ) +
495  " pool sizes provided, but asked to use indices " +
496  std::to_string( p.first ) + " and " + std::to_string( p.second )
497  );
498  }
499  result.add_calculator(
500  p.first, p.second,
501  ::genesis::utils::make_unique<Calculator>(
502  pool_sizes[p.first],
503  pool_sizes[p.second],
504  args...
505  )
506  );
507  }
508  return result;
509 }
510 
520 template<class Calculator, typename... Args>
522  size_t index,
523  std::vector<size_t> const& pool_sizes,
524  Args... args
525 ) {
526  FstPoolProcessor result;
527  for( size_t i = 0; i < pool_sizes.size(); ++i ) {
528  result.add_calculator(
529  index, i,
530  ::genesis::utils::make_unique<Calculator>(
531  pool_sizes[index],
532  pool_sizes[i],
533  args...
534  )
535  );
536  }
537  return result;
538 }
539 
549 template<class Calculator, typename... Args>
551  size_t index_1, size_t index_2,
552  std::vector<size_t> const& pool_sizes,
553  Args... args
554 ) {
555  FstPoolProcessor result;
556  result.add_calculator(
557  index_1, index_2,
558  ::genesis::utils::make_unique<Calculator>(
559  pool_sizes[index_1],
560  pool_sizes[index_2],
561  args...
562  )
563  );
564  return result;
565 }
566 
567 // =================================================================================================
568 // Sample Names Helper Function
569 // =================================================================================================
570 
578 inline std::vector<std::pair<std::string, std::string>> fst_pool_processor_sample_names(
579  FstPoolProcessor const& processor,
580  std::vector<std::string> const& sample_names
581 ) {
582  // Without sample names given, we just return an empty pair.
583  if( sample_names.empty() ) {
584  return {};
585  }
586 
587  // Prepare the actual result vector.
588  std::vector<std::pair<std::string, std::string>> result;
589  result.reserve( processor.size() );
590 
591  // Make a list of sample name pairs, one for each calculator in the processor.
592  assert( processor.sample_pairs().size() == processor.size() );
593  for( auto const& sample_pair : processor.sample_pairs() ) {
594  if( sample_pair.first >= sample_names.size() || sample_pair.second >= sample_names.size() ) {
595  throw std::invalid_argument(
596  "In fst_pool_processor_sample_names(): sample names at indices " +
597  std::to_string( sample_pair.first ) + " and " + std::to_string( sample_pair.second ) +
598  " requested, but sample names with " + std::to_string( sample_names.size() ) +
599  " entries given."
600  );
601  }
602  result.push_back( std::make_pair(
603  sample_names[ sample_pair.first ], sample_names[ sample_pair.second ]
604  ));
605  }
606  return result;
607 }
608 
609 } // namespace population
610 } // namespace genesis
611 
612 #endif // include guard
genesis::population::FstPoolProcessor::get_pi_vectors
PiVectorTuple const & get_pi_vectors(BaseWindow< D > const &window, std::shared_ptr< GenomeLocusSet > provided_loci) const
Get lists of all the three intermediate pi values (within, between, total) that are part of our unbia...
Definition: fst_pool_processor.hpp:317
thread_functions.hpp
genesis::population::FstPoolProcessor::thread_pool
FstPoolProcessor & thread_pool(std::shared_ptr< utils::ThreadPool > value)
Set the thread pool used for processing, if enough sample pairs are being processed.
Definition: fst_pool_processor.hpp:124
genesis::population::FstPoolProcessor::size
size_t size() const
Get the total number of calculates, i.e., the number of pairs of samples for which we comute FST here...
Definition: fst_pool_processor.hpp:172
genesis::population::FstPoolProcessor
Helper class to iterate over Variants and process pairs of FST between their samples (SampleCounts),...
Definition: fst_pool_processor.hpp:67
genesis::population::FilterStatus::get
IntType get() const
Definition: filter_status.hpp:91
genesis::population::FstPoolProcessor::calculators
std::vector< std::unique_ptr< BaseFstPoolCalculator > > const & calculators() const
Definition: fst_pool_processor.hpp:391
genesis::population::FstPoolProcessor::PiVectorTuple
std::tuple< std::vector< double >, std::vector< double >, std::vector< double > > PiVectorTuple
Definition: fst_pool_processor.hpp:75
base_window.hpp
genesis::population::FstPoolProcessor::operator=
FstPoolProcessor & operator=(FstPoolProcessor const &)=default
genesis::population::FstPoolProcessor::reset
void reset()
Definition: fst_pool_processor.hpp:179
genesis::population::FstPoolCalculatorUnbiased
Compute our unbiased F_ST statistic for pool-sequenced data for two ranges of SampleCountss.
Definition: fst_pool_unbiased.hpp:82
genesis::utils::Options::global_thread_pool
std::shared_ptr< ThreadPool > global_thread_pool() const
Return a global thread pool to be used for parallel computations.
Definition: options.hpp:268
genesis::utils::parallel_for
MultiFuture< void > parallel_for(T1 begin, T2 end, F &&body, std::shared_ptr< ThreadPool > thread_pool=nullptr, size_t num_blocks=0, bool auto_wait=true)
Parallel for over a range of positions, breaking the range into blocks for which the body function is...
Definition: thread_functions.hpp:259
genesis::population::FstPoolProcessor::FstPoolProcessor
FstPoolProcessor(std::shared_ptr< utils::ThreadPool > thread_pool=nullptr, size_t threading_threshold=4096)
Construct a processor.
Definition: fst_pool_processor.hpp:88
std.hpp
Provides some valuable additions to STD.
genesis::population::fst_pool_processor_sample_names
std::vector< std::pair< std::string, std::string > > fst_pool_processor_sample_names(FstPoolProcessor const &processor, std::vector< std::string > const &sample_names)
Return a list of sample name pairs for each calculator in an FstPoolProcessor.
Definition: fst_pool_processor.hpp:578
genesis::population::make_fst_pool_processor
FstPoolProcessor make_fst_pool_processor(std::vector< size_t > const &pool_sizes, Args... args)
Create an FstPoolProcessor for all-to-all computation of FST between all pairs of samples.
Definition: fst_pool_processor.hpp:454
genesis::population::to_string
std::string to_string(GenomeLocus const &locus)
Definition: function/genome_locus.hpp:52
window_average.hpp
genesis::population::FstPoolProcessor::threading_threshold
size_t threading_threshold() const
Definition: fst_pool_processor.hpp:130
genesis::population::FstPoolProcessor::threading_threshold
FstPoolProcessor & threading_threshold(size_t value)
Set the threshold of calculators after which the processing is done in threads.
Definition: fst_pool_processor.hpp:144
fst_pool_calculator.hpp
genesis::population::FstPoolProcessor::~FstPoolProcessor
~FstPoolProcessor()=default
genesis::population::Variant
A single variant at a position in a chromosome, along with SampleCounts for a set of samples.
Definition: variant.hpp:65
genesis::population::FstPoolProcessor::get_result
std::vector< double > const & get_result(BaseWindow< D > const &window, std::shared_ptr< GenomeLocusSet > provided_loci) const
Get a list of all resulting FST values for all pairs of samples.
Definition: fst_pool_processor.hpp:266
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::population::FilterStatus::passing
bool passing() const
Definition: filter_status.hpp:75
genome_locus_set.hpp
genesis::population::FstPoolProcessor::process
void process(Variant const &variant)
Definition: fst_pool_processor.hpp:213
variant_filter.hpp
genesis::population::FstPoolProcessor::thread_pool
std::shared_ptr< utils::ThreadPool > thread_pool() const
Get the thread pool used for processing, if enough sample pairs are being processed.
Definition: fst_pool_processor.hpp:113
genesis::population::FstPoolProcessor::get_result
std::vector< double > const & get_result() const
Get a list of all resulting FST values for all pairs of samples.
Definition: fst_pool_processor.hpp:294
fst_pool_unbiased.hpp
options.hpp
genesis::population::Variant::samples
std::vector< SampleCounts > samples
Definition: variant.hpp:82
genesis::population::VariantFilterStats
FilterStats< VariantFilterTag > VariantFilterStats
Counts of how many Variants with each VariantFilterTag occured in some data.
Definition: variant_filter.hpp:306
genesis::population::FilterStats< VariantFilterTag >
genesis::population::FstPoolProcessor::add_calculator
void add_calculator(size_t index_p1, size_t index_p2, std::unique_ptr< BaseFstPoolCalculator > calculator)
Add a calculator, that is, an instance to compute FST for a pair of samples.
Definition: fst_pool_processor.hpp:153
variant.hpp
genesis::population::FstPoolProcessor::get_pi_vectors
PiVectorTuple const & get_pi_vectors() const
Get lists of all the three intermediate pi values (within, between, total) that are part of our unbia...
Definition: fst_pool_processor.hpp:352
genesis::utils::Options::get
static Options & get()
Returns a single instance of this class.
Definition: options.hpp:68
genesis::population::FilterStats::clear
void clear()
Definition: filter_stats.hpp:175
genesis::population::BaseWindow
Base class for Window and WindowView, to share common functionality.
Definition: base_window.hpp:60
thread_pool.hpp
genesis::population::Variant::status
FilterStatus status
Definition: variant.hpp:76
genesis::population::FstPoolProcessor::get_filter_stats
VariantFilterStats get_filter_stats() const
Get the sum of filter statistics of all Variants processed here.
Definition: fst_pool_processor.hpp:381
genesis::population::FstPoolProcessor::sample_pairs
std::vector< std::pair< size_t, size_t > > const & sample_pairs() const
Definition: fst_pool_processor.hpp:386