A library for working with phylogenetic data.
v0.25.0
vcf_input_iterator.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_POPULATION_FORMATS_VCF_INPUT_ITERATOR_H_
2 #define GENESIS_POPULATION_FORMATS_VCF_INPUT_ITERATOR_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2021 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lczech@carnegiescience.edu>
23  Department of Plant Biology, Carnegie Institution For Science
24  260 Panama Street, Stanford, CA 94305, USA
25 */
26 
34 #ifdef GENESIS_HTSLIB
35 
39 
41 
42 #include <cassert>
43 #include <memory>
44 #include <stdexcept>
45 #include <string>
46 #include <utility>
47 #include <vector>
48 
49 namespace genesis {
50 namespace population {
51 
52 // =================================================================================================
53 // VCF/BCF Input Iterator
54 // =================================================================================================
55 
92 {
93 public:
94 
95  // -------------------------------------------------------------------------
96  // Member Types
97  // -------------------------------------------------------------------------
98 
101  using pointer = value_type const*;
102  using reference = value_type const&;
103  using difference_type = std::ptrdiff_t;
104  using iterator_category = std::input_iterator_tag;
105 
106  // -------------------------------------------------------------------------
107  // Constructors and Rule of Five
108  // -------------------------------------------------------------------------
109 
113  VcfInputIterator() = default;
114 
129  std::string const& filename,
130  size_t block_size = 1024
131  )
132  // Call the other constuctor, to avoid code duplication.
133  : VcfInputIterator( filename, std::vector<std::string>{}, false, block_size )
134  {}
135 
146  std::string const& filename,
147  std::vector<std::string> const& sample_names,
148  bool inverse_sample_names = false,
149  size_t block_size = 1024
150  )
151  : filename_( filename )
152  , block_size_( block_size )
153  , file_( std::make_shared<HtsFile>( filename ))
154  , header_( std::make_shared<VcfHeader>( *file_ ))
155  , current_block_( std::make_shared<std::vector<VcfRecord>>() )
156  , buffer_block_( std::make_shared<std::vector<VcfRecord>>() )
157  , thread_pool_( std::make_shared<utils::ThreadPool>( 1 ))
158  , future_( std::make_shared<std::future<size_t>>() )
159  {
160  if( block_size_ == 0 ) {
161  throw std::invalid_argument( "Invalid block_size == 0 for VcfInputIterator" );
162  }
163 
164  // Above, we initialized part of the htslib-related data (file_, header_) to point to their
165  // objects, as well as empty vectors for current_block_ and end_pos_, which will be filled in the
166  // init_() call below. Furthermore, we started a thread_pool_ with exactly 1 thread,
167  // and prepared the future that stores how many records were read in the prefetching.
168 
169  // Filter sample columns by their name.
170  if( ! sample_names.empty() ) {
171  header_->set_samples( sample_names, inverse_sample_names );
172  }
173 
174  // Initialize the current_block_ and buffer_block_, and read the first block(s) of the file.
175  init_();
176  }
177 
178  ~VcfInputIterator() = default;
179 
180  VcfInputIterator( self_type const& ) = default;
181  VcfInputIterator( self_type&& ) = default;
182 
183  self_type& operator= ( self_type const& ) = default;
184  self_type& operator= ( self_type&& ) = default;
185 
186  // -------------------------------------------------------------------------
187  // Comparators
188  // -------------------------------------------------------------------------
189 
193  explicit operator bool() const
194  {
195  assert( current_pos_ <= end_pos_ );
196  return current_pos_ < end_pos_;
197  }
198 
199  bool good() const
200  {
201  assert( current_pos_ <= end_pos_ );
202  return current_pos_ < end_pos_;
203  }
204 
205  // -------------------------------------------------------------------------
206  // Accessors
207  // -------------------------------------------------------------------------
208 
209  std::string const& filename() const
210  {
211  return filename_;
212  }
213 
214  HtsFile const& hts_file() const
215  {
216  // Here and below we assert the existence of a pointed-to object in the shared pointer,
217  // which does not hold true if the default constructor of this class was used, in which
218  // case any of these dereferencing functions here are supposed to be invalid - so, by
219  // using assertions here, we can fail a bit more gracefully in such cases, instead of
220  // getting seg faults due to nullptrs.
221  assert( file_ );
222  return *file_;
223  }
224 
226  {
227  assert( file_ );
228  return *file_;
229  }
230 
231  VcfHeader const& header() const
232  {
233  assert( header_ );
234  return *header_;
235  }
236 
238  {
239  assert( header_ );
240  return *header_;
241  }
242 
243  VcfRecord const& record() const
244  {
245  assert( current_block_ );
246  assert( current_pos_ < end_pos_ );
247  return (*current_block_)[current_pos_];
248  }
249 
251  {
252  assert( current_block_ );
253  assert( current_pos_ < end_pos_ );
254  return (*current_block_)[current_pos_];
255  }
256 
257  VcfRecord const* operator ->() const
258  {
259  assert( current_block_ );
260  assert( current_pos_ < end_pos_ );
261  return &(*current_block_)[current_pos_];
262  }
263 
265  {
266  assert( current_block_ );
267  assert( current_pos_ < end_pos_ );
268  return &(*current_block_)[current_pos_];
269  }
270 
271  VcfRecord const& operator*() const
272  {
273  assert( current_block_ );
274  assert( current_pos_ < end_pos_ );
275  return (*current_block_)[current_pos_];
276  }
277 
279  {
280  assert( current_block_ );
281  assert( current_pos_ < end_pos_ );
282  return (*current_block_)[current_pos_];
283  }
284 
285  // -------------------------------------------------------------------------
286  // Iteration
287  // -------------------------------------------------------------------------
288 
290  {
291  increment_();
292  return *this;
293  }
294 
296  {
297  increment_();
298  return *this;
299  }
300 
301  bool operator==( self_type const& it ) const
302  {
303  // We want equality between iterators that are copies of each other, and inequality
304  // for non copies that were not default constructed. For the default constructed iterator,
305  // which serves as the past-the-end marker, we need a special case.
306 
307  // Test if either of the two was default constructed. If so, we want a non-default
308  // constructed iterator also compare equal to a default constructed one only if it is done
309  // reading data, in which case good() == false (whose code we copy here)
310  if( ! file_ || ! it.file_ ) {
311  return ( current_pos_ < end_pos_ ) == ( it.current_pos_ < it.end_pos_ );
312  }
313 
314  // In all other cases, we have two normal iterators that we want to compare.
315  // We only need to compare one of the pointers to make sure that the iterators point to the
316  // same hts data (that is, one was created as a copy of the other), and assert the others.
317  // We assert the rest (if file_ equals, then the others have to equal as well).
318  assert(
319  file_ != it.file_ ||
320  (
321  header_ == it.header_ &&
322  current_block_ == it.current_block_ &&
323  buffer_block_ == it.buffer_block_ &&
324  thread_pool_ == it.thread_pool_ &&
325  future_ == it.future_
326  )
327  );
328  return file_ == it.file_;
329  }
330 
331  bool operator!=( self_type const& it ) const
332  {
333  return !(*this == it);
334  }
335 
336  // -------------------------------------------------------------------------
337  // Private Members
338  // -------------------------------------------------------------------------
339 
340 private:
341 
342  void init_()
343  {
344  assert( file_ );
345  assert( header_ );
346  assert( current_block_ );
347  assert( buffer_block_ );
348 
349  // Init the records and create empty VcfRecord to read into.
350  current_block_->reserve( block_size_ );
351  buffer_block_->reserve( block_size_ );
352  for( size_t i = 0; i < block_size_; ++i ) {
353  current_block_->emplace_back( *header_ );
354  buffer_block_->emplace_back( *header_ );
355  }
356  assert( current_block_->size() == block_size_ );
357  assert( buffer_block_->size() == block_size_ );
358 
359  // Read the first block synchronously, so that there is data initialized to be dereferenced.
360  end_pos_ = VcfInputIterator::read_block_( file_, current_block_, block_size_ );
361  assert( current_pos_ == 0 );
362 
363  // If there is less data than the block size, the file is already done.
364  // No need to start the async buffering.
365  if( end_pos_ < block_size_ ) {
366  return;
367  }
368 
369  // Now start the worker thread to fill the buffer.
370  fill_buffer_block_();
371  }
372 
373  void increment_()
374  {
375  // Finish the reading (potentially waiting if not yet finished in the worker thread).
376  // The future returns how much data there was to be read, which we use as our status.
377  // After that, swap the buffer and start a new reading operation in the worker thread.
378 
379  // Move to the next element in the vector. If we are at the end of the record vector,
380  // and if that vector was full (complete block size), there is more data, so start reading.
381  ++current_pos_;
382  if( current_pos_ == end_pos_ && end_pos_ == block_size_ ) {
383  assert( future_ );
384  assert( future_->valid() );
385 
386  // Get how many records were read into the buffer, which also waits for the reading
387  // if necessary. After that, we can swao the buffer, start reading again, and set
388  // or internal current location to the first element of the vector again.
389  end_pos_ = future_->get();
390  std::swap( buffer_block_, current_block_ );
391  fill_buffer_block_();
392  current_pos_ = 0;
393  }
394  }
395 
396  void fill_buffer_block_()
397  {;
398  assert( thread_pool_ );
399  assert( future_ );
400 
401  // This function is only every called after we finished any previous operations,
402  // so let's assert that the thread pool and future are in the states that we expect.
403  assert( thread_pool_->load() == 0 );
404  assert( ! future_->valid() );
405 
406  // In order to use lambda captures by copy for class member variables in C++11, we first
407  // have to make local copies, and then capture those. Capturing the class members direclty
408  // was only introduced later. Bit cumbersome, but gets the work done.
409  auto file = file_;
410  auto buffer_block = buffer_block_;
411  auto block_size = block_size_;
412 
413  // The lambda returns the result of read_block_ call, that is, the number of records
414  // that have been read, and which we later (in the future_) use to see how much data we got.
415  *future_ = thread_pool_->enqueue(
416  [ file, buffer_block, block_size ](){
417  return VcfInputIterator::read_block_( file, buffer_block, block_size );
418  }
419  );
420  }
421 
422  static size_t read_block_(
423  std::shared_ptr<HtsFile> file,
424  std::shared_ptr<std::vector<VcfRecord>> target,
425  size_t block_size
426  ) {
427  // This is a static function that does not depend on the class member data, so that
428  // we can use it from the future lambda in the thread pool above without having to worry
429  // about lambda captures of `this` going extinct... which was an absolutely nasty bug to
430  // find! Such a rookie mistake! For that reason, we also take all arguments as shared
431  // pointers, so that they are kept alive while the thread pool is working.
432  // However, once its done with its work, the function (the one that we give to the thread
433  // pool with a lambda) is popped from the thread queue, so that the shared pointer can
434  // be freed again - that is, we do not need to worry about the lambda keeping the shared
435  // pointer from freeing its memory indefinitely.
436 
437  assert( file );
438  assert( target->size() == block_size );
439 
440  // Read as long as there is data. Return number of read records.
441  size_t i = 0;
442  for( ; i < block_size; ++i ) {
443  if( ! (*target)[i].read_next( *file ) ) {
444  break;
445  }
446  }
447  return i;
448  }
449 
450  // -------------------------------------------------------------------------
451  // Data Members
452  // -------------------------------------------------------------------------
453 
454 private:
455 
456  std::string filename_;
457 
458  // We buffer block_size_ many vcf records, and within each block, iterate via current_pos_
459  // from 0 (first element of the block) to end_pos_ (past the end counter).
460  size_t block_size_ = 1024;
461  size_t current_pos_ = 0;
462  size_t end_pos_ = 0;
463 
464  // htslib structs. We use shared pointers here to allow copying this iterator.
465  // Also, use a buffer into which we read asynchronously, and then swap with the current_block_
466  std::shared_ptr<HtsFile> file_;
467  std::shared_ptr<VcfHeader> header_;
468  std::shared_ptr<std::vector<VcfRecord>> current_block_;
469  std::shared_ptr<std::vector<VcfRecord>> buffer_block_;
470 
471  // Thread pool to run the buffering in the background.
472  // Also, store the future_ used to keep track of the background task. It returns the number of
473  // record lines that have been read into the buffer (block_size_, or less at the end of the file).
474  std::shared_ptr<utils::ThreadPool> thread_pool_;
475  std::shared_ptr<std::future<size_t>> future_;
476 
477 };
478 
479 } // namespace population
480 } // namespace genesis
481 
482 #endif // htslib guard
483 #endif // include guard
genesis::placement::swap
void swap(Sample &lhs, Sample &rhs)
Definition: sample.cpp:104
genesis::population::VcfInputIterator::filename
std::string const & filename() const
Definition: vcf_input_iterator.hpp:209
genesis::population::VcfInputIterator::difference_type
std::ptrdiff_t difference_type
Definition: vcf_input_iterator.hpp:103
genesis::population::VcfInputIterator::self_type
VcfInputIterator self_type
Definition: vcf_input_iterator.hpp:99
genesis::population::VcfInputIterator::record
VcfRecord & record()
Definition: vcf_input_iterator.hpp:250
genesis::population::VcfInputIterator::header
VcfHeader & header()
Definition: vcf_input_iterator.hpp:237
genesis::population::VcfInputIterator::operator==
bool operator==(self_type const &it) const
Definition: vcf_input_iterator.hpp:301
genesis::population::VcfInputIterator::operator->
VcfRecord const * operator->() const
Definition: vcf_input_iterator.hpp:257
genesis::population::VcfInputIterator::hts_file
HtsFile & hts_file()
Definition: vcf_input_iterator.hpp:225
genesis::population::VcfInputIterator::reference
value_type const & reference
Definition: vcf_input_iterator.hpp:102
genesis::population::VcfInputIterator::good
bool good() const
Definition: vcf_input_iterator.hpp:199
genesis::population::VcfInputIterator::VcfInputIterator
VcfInputIterator()=default
Create a default instance, with no input. This is also the past-the-end iterator.
vcf_header.hpp
genesis::population::VcfInputIterator::operator!=
bool operator!=(self_type const &it) const
Definition: vcf_input_iterator.hpp:331
genesis::population::VcfInputIterator::VcfInputIterator
VcfInputIterator(std::string const &filename, std::vector< std::string > const &sample_names, bool inverse_sample_names=false, size_t block_size=1024)
Create an instance that reads from an input file name.
Definition: vcf_input_iterator.hpp:145
genesis::population::VcfInputIterator::hts_file
HtsFile const & hts_file() const
Definition: vcf_input_iterator.hpp:214
hts_file.hpp
genesis::population::VcfInputIterator::pointer
value_type const * pointer
Definition: vcf_input_iterator.hpp:101
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::population::VcfInputIterator::~VcfInputIterator
~VcfInputIterator()=default
genesis::population::VcfInputIterator::operator*
VcfRecord & operator*()
Definition: vcf_input_iterator.hpp:278
genesis::population::VcfInputIterator::operator*
VcfRecord const & operator*() const
Definition: vcf_input_iterator.hpp:271
genesis::population::VcfInputIterator::VcfInputIterator
VcfInputIterator(std::string const &filename, size_t block_size=1024)
Create an instance that reads from an input file name.
Definition: vcf_input_iterator.hpp:128
genesis::population::VcfInputIterator::iterator_category
std::input_iterator_tag iterator_category
Definition: vcf_input_iterator.hpp:104
genesis::population::VcfInputIterator::operator=
self_type & operator=(self_type const &)=default
genesis::population::VcfRecord
Capture the information of a single SNP/variant line in a VCF/BCF file.
Definition: vcf_record.hpp:107
genesis::population::VcfInputIterator::header
VcfHeader const & header() const
Definition: vcf_input_iterator.hpp:231
vcf_record.hpp
thread_pool.hpp
genesis::population::VcfHeader
Capture the information from a header of a VCF/BCF file.
Definition: vcf_header.hpp:102
genesis::population::HtsFile
Wrap an ::htsFile struct.
Definition: hts_file.hpp:56
genesis::population::VcfInputIterator::record
VcfRecord const & record() const
Definition: vcf_input_iterator.hpp:243
genesis::population::VcfInputIterator::operator++
self_type & operator++()
Definition: vcf_input_iterator.hpp:289
genesis::population::VcfInputIterator
Iterate an input source and parse it as a VCF/BCF file.
Definition: vcf_input_iterator.hpp:91