A toolkit for working with phylogenetic data.
v0.24.0
input_stream.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_STREAM_H_
2 #define GENESIS_UTILS_IO_INPUT_STREAM_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2020 Lucas Czech and HITS gGmbH
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
37 
38 #include <algorithm>
39 #include <cassert>
40 #include <cstdint>
41 #include <memory>
42 #include <stdexcept>
43 #include <string>
44 #include <utility>
45 
46 namespace genesis {
47 namespace utils {
48 
49 // =================================================================================================
50 // Input Stream
51 // =================================================================================================
52 
81 {
82 public:
83 
84  // -------------------------------------------------------------
85  // Member Types
86  // -------------------------------------------------------------
87 
95  static const size_t BlockLength = 1 << 22;
96 
98  using value_type = char;
99 
100  // -------------------------------------------------------------
101  // Constructors and Rule of Five
102  // -------------------------------------------------------------
103 
105  : source_name_( "invalid source" )
106  , buffer_( nullptr )
107  , data_pos_( 0 )
108  , data_end_( 0 )
109  , current_( '\0' )
110  , line_( 0 )
111  , column_( 0 )
112  {}
113 
114  explicit InputStream( std::shared_ptr<BaseInputSource> input_source )
115  : line_( 1 )
116  , column_( 1 )
117  {
118  init_( input_source );
119  }
120 
122  {
123  delete[] buffer_;
124  buffer_ = nullptr;
125  }
126 
127  InputStream(self_type const&) = delete;
128 
130  : buffer_( nullptr )
131  {
132  *this = std::move( other );
133  }
134 
135  self_type& operator= (self_type const&) = delete;
136 
138  {
139  if( this == &other ) {
140  return *this;
141  }
142 
143  input_reader_ = std::move( other.input_reader_ );
144  source_name_ = std::move( other.source_name_ );
145 
146  // Need to free our current buffer.
147  if( buffer_ ) {
148  delete[] buffer_;
149  }
150 
151  // Move the data.
152  buffer_ = other.buffer_;
153  data_pos_ = other.data_pos_;
154  data_end_ = other.data_end_;
155  current_ = other.current_;
156  line_ = other.line_;
157  column_ = other.column_;
158 
159  // Set the other in a valid but empty state and avoid double freeing of the buffer.
160  other.buffer_ = nullptr;
161  other.data_pos_ = 0;
162  other.data_end_ = 0;
163  other.current_ = '\0';
164  other.line_ = 0;
165  other.column_ = 0;
166 
167  return *this;
168  }
169 
170  // -------------------------------------------------------------
171  // Char Operations
172  // -------------------------------------------------------------
173 
177  char operator * () const
178  {
179  return current_;
180  }
181 
196  char current() const
197  {
198  if( data_pos_ >= data_end_ ) {
199  throw std::runtime_error(
200  "Unexpected end of " + source_name() + " at " + at() + "."
201  );
202  }
203  if( current_ < 0 ) {
204  throw std::domain_error(
205  "Invalid input char in " + source_name() + " at " + at() + "."
206  );
207  }
208  return current_;
209  }
210 
215  {
216  // If we were already at the end, set counter so zero.
217  if( data_pos_ >= data_end_ ) {
218  reset_();
219  return *this;
220  }
221 
222  // Read data if necessary.
223  update_blocks_();
224  assert( data_pos_ < BlockLength );
225 
226  // In case we are moving to a new line, set the counters accordingly.
227  if( current_ == '\n' ) {
228  ++line_;
229  column_ = 1;
230  } else {
231  ++column_;
232  }
233 
234  // Next position.
235  ++data_pos_;
236 
237  // Set the char.
238  set_current_char_();
239  return *this;
240  }
241 
246  {
247  advance();
248  return *this;
249  }
250 
256  char get_char()
257  {
258  char ret = current_;
259  advance();
260  return ret;
261  }
262 
263  // -------------------------------------------------------------
264  // Line Operations
265  // -------------------------------------------------------------
266 
275  void get_line( std::string& target )
276  {
277  // Check edge case.
278  if( data_pos_ >= data_end_ ) {
279  return;
280  }
281 
282  // Loop until we find the end of the line. As this can be longer than one block,
283  // we might need to update the blocks and store the results in between.
284  while( true ) {
285  // Read data if necessary.
286  update_blocks_();
287  assert( data_pos_ < BlockLength );
288 
289  // Store the starting position, so that we can copy from there once we found the end.
290  size_t const start = data_pos_;
291 
292  // Read until the end of the line, but also stop before the end of the data,
293  // and after we read a full block. End of data: we are done anyway.
294  // End of block: need to read the next one first, so loop again.
295  auto const stop = std::min( data_end_, start + BlockLength );
296 
297  // 8-fold loop unrolling. Yes, the compiler does not do that.
298  // It gives some speedup, in particular if the reading is used in a parser that also
299  // does other things with the data. In a stand-alone line reader, it still gives
300  // a slight advantage.
301  while(
302  data_pos_ + 7 < stop &&
303  buffer_[ data_pos_ + 0 ] != '\n' &&
304  buffer_[ data_pos_ + 0 ] != '\r' &&
305  buffer_[ data_pos_ + 1 ] != '\n' &&
306  buffer_[ data_pos_ + 1 ] != '\r' &&
307  buffer_[ data_pos_ + 2 ] != '\n' &&
308  buffer_[ data_pos_ + 2 ] != '\r' &&
309  buffer_[ data_pos_ + 3 ] != '\n' &&
310  buffer_[ data_pos_ + 3 ] != '\r' &&
311  buffer_[ data_pos_ + 4 ] != '\n' &&
312  buffer_[ data_pos_ + 4 ] != '\r' &&
313  buffer_[ data_pos_ + 5 ] != '\n' &&
314  buffer_[ data_pos_ + 5 ] != '\r' &&
315  buffer_[ data_pos_ + 6 ] != '\n' &&
316  buffer_[ data_pos_ + 6 ] != '\r' &&
317  buffer_[ data_pos_ + 7 ] != '\n' &&
318  buffer_[ data_pos_ + 7 ] != '\r'
319  ) {
320  data_pos_ += 8;
321  }
322 
323  // Working AVX version. Not worth the trouble as of now. Keeping it here for reference.
324 
325  // #ifdef GENESIS_AVX
326  // #include <immintrin.h>
327  // #endif
328  //
329  // auto b = _mm256_loadu_si256(( __m256i const* )( buffer_ + data_pos_ ));
330  //
331  // static auto const n = _mm256_set1_epi8( '\n' );
332  // static auto const r = _mm256_set1_epi8( '\r' );
333  //
334  // auto bn = _mm256_cmpeq_epi8( b, n );
335  // auto br = _mm256_cmpeq_epi8( b, r );
336  //
337  // while(
338  // data_pos_ + 32 <= stop &&
339  // _mm256_testz_si256( bn, bn ) &&
340  // _mm256_testz_si256( bn, bn )
341  // ) {
342  // data_pos_ += 32;
343  // b = _mm256_loadu_si256(( __m256i const* )( buffer_ + data_pos_ ));
344  // bn = _mm256_cmpeq_epi8( b, n );
345  // br = _mm256_cmpeq_epi8( b, r );
346  // }
347 
348  // Alternative version taht uses 64bit words instead, and hence works without AVX.
349  // Uses macros from https://graphics.stanford.edu/~seander/bithacks.html
350 
351  // static auto const nmask = ~static_cast<uint64_t>(0) / 255U * '\n';
352  // static auto const rmask = ~static_cast<uint64_t>(0) / 255U * '\r';
353  //
354  // #define haszero(v) (((v) - static_cast<uint64_t>(0x0101010101010101)) & ~(v) & static_cast<uint64_t>(0x8080808080808080))
355  // #define hasvalue(x,n) (haszero((x) ^ (~static_cast<uint64_t>(0) / 255U * (n))))
356  //
357  // auto const* buffc = reinterpret_cast<uint64_t const*>( buffer_ + data_pos_ );
358  // size_t i = 0;
359  // while( true ) {
360  // bool const e = i*8 >= data_end_;
361  // bool const b = i*8 - start >= BlockLength;
362  //
363  // // bool const n = buffc[i] ^ nmask;
364  // // bool const r = buffc[i] ^ rmask;
365  // bool const n = hasvalue( buffc[i], '\n' );
366  // bool const r = hasvalue( buffc[i], '\r' );
367  //
368  // if( e | b | n | r ) {
369  // break;
370  // }
371  //
372  // ++i;
373  // }
374  // data_pos_ += i*8;
375  //
376  // #undef haszero
377  // #undef hasvalue
378 
379  // The above loop ends with data_pos_ somewhere before the exact line break.
380  // We now need to walk the rest by foot, and examine char by char.
381  while(
382  data_pos_ < stop &&
383  buffer_[ data_pos_ ] != '\n' &&
384  buffer_[ data_pos_ ] != '\r'
385  ) {
386  ++data_pos_;
387  }
388 
389  // Store what we have so far.
390  target.append( buffer_ + start, data_pos_ - start );
391 
392  // If the line is not yet finished, we need an extra round. Start the loop again.
393  assert( data_pos_ >= start );
394  if( data_pos_ - start >= BlockLength ) {
395  continue;
396  }
397 
398  // In all other cases, we stop here.
399  break;
400  }
401 
402  // Some safty.
403  assert( data_pos_ <= data_end_ );
404  assert( data_pos_ < 2 * BlockLength );
405 
406  // Check all cases that can occur.
407  if( data_pos_ == data_end_ ) {
408 
409  // Files might be missing the line break at the end of the last line.
410  // We catch this case here, so that we can be sure that the next conditions
411  // are actually valid when accessing the buffer.
412  // But we don't need to do anything in this case.
413 
414  } else if( buffer_[ data_pos_ ] == '\n' ) {
415  ++data_pos_;
416 
417  } else if( buffer_[ data_pos_ ] == '\r' ) {
418  ++data_pos_;
419 
420  // Treat stupid Windows \r\n lines breaks.
421  // We already moved past the \r, so check the next char.
422  if( data_pos_ < data_end_ && buffer_[ data_pos_ ] == '\n' ) {
423  ++data_pos_;
424  }
425  } else {
426  // We have checked all cases where the loop above can terminate.
427  // So this should not happen.
428  assert( false );
429  }
430 
431  // Set char and counters. It checks for end of the file,
432  // so this is safe if we are past the end already.
433  set_current_char_();
434  ++line_;
435  column_ = 1;
436  }
437 
445  std::string get_line()
446  {
447  std::string result;
448  get_line( result );
449  return result;
450  }
451 
452  // -------------------------------------------------------------
453  // State
454  // -------------------------------------------------------------
455 
461  size_t line() const
462  {
463  return line_;
464  }
465 
472  size_t column() const
473  {
474  return column_;
475  }
476 
481  std::string at() const
482  {
483  return std::to_string( line_ ) + ":" + std::to_string( column_ );
484  }
485 
489  bool good() const
490  {
491  return data_pos_ < data_end_;
492  }
493 
498  explicit operator bool() const
499  {
500  return data_pos_ < data_end_;
501  }
502 
506  bool eof() const
507  {
508  return data_pos_ >= data_end_;
509  }
510 
522  std::string source_name() const
523  {
524  return source_name_;
525  }
526 
527  // -------------------------------------------------------------
528  // Internal Members
529  // -------------------------------------------------------------
530 
531 private:
532 
536  void reset_()
537  {
538  line_ = 0;
539  column_ = 0;
540  current_ = '\0';
541  }
542 
546  void update_blocks_()
547  {
548  // This function is only called from the read char and line methods, which both beforehand
549  // check the following condition. So, if it breaks, this function is invalidly called
550  // from somewhere else.
551  assert( data_pos_ < data_end_ );
552 
553  // If this assertion breaks, someone tempered with our internal invariants.
554  assert( data_end_ <= BlockLength * 2 );
555 
556  // If we are past the first block, we need to load more data into the blocks.
557  if( data_pos_ >= BlockLength ) {
558 
559  // Move the second to the first block.
560  std::memcpy( buffer_, buffer_ + BlockLength, BlockLength );
561  data_pos_ -= BlockLength;
562  data_end_ -= BlockLength;
563 
564  // If we are not yet at the end of the data, start the reader again:
565  // Copy the third block to the second, and read into the third one.
566  if( input_reader_ && input_reader_->valid() ) {
567  data_end_ += input_reader_->finish_reading();
568  std::memcpy( buffer_ + BlockLength, buffer_ + 2 * BlockLength, BlockLength );
569  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
570  }
571  }
572 
573  // After the update, the current position needs to be within the first block.
574  assert( data_pos_ < BlockLength );
575  }
576 
581  void set_current_char_()
582  {
583  if( data_pos_ >= data_end_ ) {
584  // If we just reached the end, do not fully reset the line and column counters.
585  // They might be needed in some parser.
586  current_ = '\0';
587  return;
588  }
589 
590  // Treat stupid Windows and Mac lines breaks. Set them to \n, so that downstream parsers
591  // don't have to deal with this.
592  if( buffer_[ data_pos_ ] == '\r' ) {
593  buffer_[ data_pos_ ] = '\n';
594 
595  // If this is a Win line break \r\n, skip one of them, so that only a single \n
596  // is visible to the outside.
597  if( data_pos_ + 1 < data_end_ && buffer_[ data_pos_ + 1 ] == '\n' ) {
598  ++data_pos_;
599  }
600  }
601 
602  // If this is the last char of the data, but there is no closing \n, add one.
603  if( data_pos_ + 1 == data_end_ && buffer_[ data_pos_ ] != '\n' ) {
604  ++data_end_;
605  buffer_[ data_pos_ + 1 ] = '\n';
606  }
607 
608  // Set the char.
609  current_ = buffer_[ data_pos_ ];
610  }
611 
615  void init_( std::shared_ptr<BaseInputSource> input_source )
616  {
617  // Set to empty defaults if there is no input.
618  if( input_source == nullptr ) {
619  source_name_ = "invalid source";
620 
621  buffer_ = nullptr;
622  data_pos_ = 0;
623  data_end_ = 0;
624 
625  current_ = '\0';
626  line_ = 0;
627  column_ = 0;
628  return;
629  }
630 
631  // We use three buffer blocks: The first two for the current block/line.
632  // The max line length is one buffer length, so the beginning of the line is always
633  // in the first block, while its end can reach into the second block, but never exeed it.
634  // The third block is for the async reading.
635  buffer_ = new char[ 3 * BlockLength ];
636 
637  try {
638  // Set source name.
639  source_name_ = input_source->source_name();
640 
641  // Read up to two blocks.
642  data_pos_ = 0;
643  data_end_ = input_source->read( buffer_, 2 * BlockLength );
644 
645  // Skip UTF-8 BOM, if found.
646  if( data_end_ >= 3 &&
647  buffer_[0] == '\xEF' &&
648  buffer_[1] == '\xBB' &&
649  buffer_[2] == '\xBF'
650  ) {
651  data_pos_ = 3;
652  }
653 
654  // If there was no data, set to "empty" values.
655  if( data_pos_ == data_end_ ) {
656  reset_();
657 
658  // If there is data, set char value.
659  } else {
660  set_current_char_();
661  }
662 
663  // If there is more data after the two blocks that we just read, start the
664  // reading process (possibly async, if pthreads is available).
665  if( data_end_ == 2 * BlockLength ) {
666 
667  // Create the reader. We need to do this explictily,
668  // as we use a unique ptr to make this class movable.
669  input_reader_ = utils::make_unique<InputReader>();
670 
671  input_reader_->init( input_source );
672  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
673  }
674 
675  } catch( ... ) {
676  delete[] buffer_;
677  throw;
678  }
679  }
680 
681  // -------------------------------------------------------------
682  // Data Members
683  // -------------------------------------------------------------
684 
685 private:
686 
687  // Input data comes from here...
688  // (we use a unique ptr to make the class movable)
689  std::unique_ptr<InputReader> input_reader_ = nullptr;
690  std::string source_name_;
691 
692  // ...and is buffered here.
693  char* buffer_;
694  size_t data_pos_;
695  size_t data_end_;
696 
697  // Also, keep track of the current char and position in the data.
698  char current_;
699  size_t line_;
700  size_t column_;
701 };
702 
703 } // namespace utils
704 } // namespace genesis
705 
706 #endif // include guard
size_t column() const
Return the current column of the input stream.
self_type & advance()
Move to the next char in the stream and advance the counters.
char current() const
Return the current char, with some checks.
InputStream(std::shared_ptr< BaseInputSource > input_source)
Container namespace for all symbols of genesis in order to keep them separate when used as a library...
InputStream(self_type &&other)
Provides some valuable additions to STD.
std::string at() const
Return a textual representation of the current input position in the form "line:column".
bool eof() const
Return true iff the input reached its end.
char operator*() const
Dereference operator. Return the current char.
bool good() const
Return true iff the input is good (not end of data) and can be read from.
char get_char()
Extract a single char from the input.
static const size_t BlockLength
Block length for internal buffering.
size_t line() const
Return the current line of the input stream.
std::shared_ptr< BaseOutputTarget > to_string(std::string &target_string)
Obtain an output target for writing to a string.
self_type & operator=(self_type const &)=delete
void get_line(std::string &target)
Read the current line, append it to the target, and move to the beginning of the next line...
std::string get_line()
Read the current line and move to the beginning of the next.
std::string source_name() const
Get the input source name where this stream reads from.
self_type & operator++()
Move to the next char in the stream. Shortcut for advance().
Stream interface for reading data from an InputSource, that keeps track of line and column counters...