A toolkit for working with phylogenetic data.
v0.22.1
input_buffer.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_BUFFER_H_
2 #define GENESIS_UTILS_IO_INPUT_BUFFER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2019 Lucas Czech and HITS gGmbH
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
35 
36 #include <cassert>
37 #include <memory>
38 #include <stdexcept>
39 #include <string>
40 #include <utility>
41 
42 namespace genesis {
43 namespace utils {
44 
45 // =================================================================================================
46 // Input Buffer
47 // =================================================================================================
48 
50 {
51 public:
52 
53  // -------------------------------------------------------------
54  // Member Types
55  // -------------------------------------------------------------
56 
62  static const size_t BlockLength = 1 << 22;
63 
65  using value_type = char;
66 
67  // -------------------------------------------------------------
68  // Constructors and Rule of Five
69  // -------------------------------------------------------------
70 
72  : source_name_( "invalid source" )
73  , buffer_( nullptr )
74  , data_pos_( 0 )
75  , data_end_( 0 )
76  {}
77 
78  explicit InputBuffer( std::shared_ptr<BaseInputSource> input_source )
79  {
80  init_( input_source );
81  }
82 
84  {
85  delete[] buffer_;
86  buffer_ = nullptr;
87  }
88 
89  InputBuffer(self_type const&) = delete;
90 
92  : buffer_( nullptr )
93  {
94  *this = std::move( other );
95  }
96 
97  self_type& operator= (self_type const&) = delete;
98 
100  {
101  if( this == &other ) {
102  return *this;
103  }
104 
105  input_reader_ = std::move( other.input_reader_ );
106  source_name_ = std::move( other.source_name_ );
107 
108  // Need to free our current buffer.
109  if( buffer_ ) {
110  delete[] buffer_;
111  }
112 
113  // Move the data.
114  buffer_ = other.buffer_;
115  data_pos_ = other.data_pos_;
116  data_end_ = other.data_end_;
117 
118  // Set the other in a valid but empty state and avoid double freeing of the buffer.
119  other.buffer_ = nullptr;
120  other.data_pos_ = 0;
121  other.data_end_ = 0;
122 
123  return *this;
124  }
125 
126  // -------------------------------------------------------------
127  // Reading
128  // -------------------------------------------------------------
129 
130  operator bool() const
131  {
132  return data_pos_ < data_end_;
133  }
134 
135  char peek( size_t ahead = 1 )
136  {
137  if( ahead > BlockLength ) {
138  throw std::runtime_error(
139  "Cannot peek ahead more than one block length of the Input Buffer."
140  );
141  }
142 
143  // Common use case: we can securely peek ahead.
144  // Make sure we do not peek at data behind the end.
145  if( data_pos_ + ahead < data_end_ ) {
146  return buffer_[ data_pos_ + ahead ];
147  } else {
148 
149  // Read blocks if necessary. Now we are surely in the first block.
150  update_blocks_();
151  assert( data_pos_ < BlockLength );
152 
153  // Try again. If we still cannot peek ahead, we are at the end of the stream.
154  if( data_pos_ + ahead < data_end_ ) {
155  return buffer_[ data_pos_ + ahead ];
156  } else {
157  return '\0';
158  }
159  }
160  }
161 
162  size_t read( char* target, size_t size )
163  {
164  // Shortcut for most common use case: We are in the first block, and have enough buffer
165  // to return the whole amount of requested data.
166  if( data_pos_ < BlockLength && size < data_end_ - data_pos_ ) {
167  std::memcpy( target, buffer_ + data_pos_, size );
168  data_pos_ += size;
169  return size;
170  }
171 
172  // Keep track of what we still need to do and what we already did.
173  size_t done_reading = 0;
174  size_t yet_to_read = size;
175 
176  // Read blocks if necessary. Now we are surely in the first block.
177  update_blocks_();
178  assert( data_pos_ < BlockLength );
179 
180  // Read data that is too big for one block, as long as there are more blocks.
181  while( yet_to_read > BlockLength && data_end_ == 2 * BlockLength ) {
182 
183  // Read one block.
184  std::memcpy( target + done_reading, buffer_ + data_pos_, BlockLength );
185  data_pos_ += BlockLength;
186 
187  // Update our track keeping.
188  done_reading += BlockLength;
189  yet_to_read -= BlockLength;
190 
191  // Update the blocks.
192  update_blocks_();
193  assert( data_pos_ < BlockLength );
194  assert( data_pos_ < data_end_ );
195  }
196 
197  // Get how much data is still buffered.
198  size_t const buffered = data_end_ - data_pos_;
199 
200  // Don't read more than there is.
201  if( yet_to_read > buffered ) {
202  yet_to_read = buffered;
203  }
204 
205  // Saftey. Never read more than there is.
206  assert( yet_to_read <= buffered );
207  assert( yet_to_read <= BlockLength );
208 
209  // Read rest.
210  std::memcpy( target + done_reading, buffer_ + data_pos_, yet_to_read );
211  data_pos_ += yet_to_read;
212 
213  // Update our track keeping.
214  done_reading += yet_to_read;
215 
216  return done_reading;
217  }
218 
219  // -------------------------------------------------------------
220  // Internal Members
221  // -------------------------------------------------------------
222 
223 private:
224 
228  void update_blocks_()
229  {
230  // Nothing to do. We are already at the end of the input.
231  if( data_pos_ == data_end_ ) {
232  return;
233  }
234  assert( data_pos_ < data_end_ );
235 
236  // If this assertion breaks, someone tempered with our internal invariants.
237  assert( data_end_ <= BlockLength * 2 );
238 
239  // If we are past the first block, we need to load more data into the blocks.
240  if( data_pos_ >= BlockLength ) {
241 
242  // Move the second to the first block.
243  std::memcpy( buffer_, buffer_ + BlockLength, BlockLength );
244  data_pos_ -= BlockLength;
245  data_end_ -= BlockLength;
246 
247  // If we are not yet at the end of the data, start the reader again:
248  // Copy the third block to the second, and read into the third one.
249  if( input_reader_ && input_reader_->valid() ) {
250  data_end_ += input_reader_->finish_reading();
251  std::memcpy( buffer_ + BlockLength, buffer_ + 2 * BlockLength, BlockLength );
252  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
253  }
254  }
255 
256  // After the update, the current position needs to be within the first block.
257  assert( data_pos_ < BlockLength );
258  }
259 
263  void init_( std::shared_ptr<BaseInputSource> input_source )
264  {
265  // Set to empty defaults if there is no input.
266  if( input_source == nullptr ) {
267  source_name_ = "invalid source";
268 
269  buffer_ = nullptr;
270  data_pos_ = 0;
271  data_end_ = 0;
272 
273  return;
274  }
275  // We use three buffer blocks:
276  // The first two for the current blocks, and the third for the async reading.
277  buffer_ = new char[ 3 * BlockLength ];
278 
279  try {
280  // Set source name.
281  source_name_ = input_source->source_name();
282 
283  // Read up to two blocks.
284  data_pos_ = 0;
285  data_end_ = input_source->read( buffer_, 2 * BlockLength );
286 
287  // If there is more data after the two blocks that we just read, start the
288  // reading process (possibly async, if pthreads is available), into the third block.
289  if( data_end_ == 2 * BlockLength ) {
290 
291  // Create the reader. We need to do this explictily,
292  // as we use a unique ptr to make this class movable.
293  input_reader_ = utils::make_unique<InputReader>();
294 
295  input_reader_->init( input_source );
296  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
297  }
298 
299  } catch( ... ) {
300  delete[] buffer_;
301  throw;
302  }
303  }
304 
305  // -------------------------------------------------------------
306  // Data Members
307  // -------------------------------------------------------------
308 
309 private:
310 
311  // Input data comes from here...
312  // (we use a unique ptr to make the class movable)
313  std::unique_ptr<InputReader> input_reader_ = nullptr;
314  std::string source_name_;
315 
316  // ...and is buffered here.
317  char* buffer_;
318 
319  // Current position in the buffer. It mostly is in the first block. Once we move into the
320  // second block when advancing this position, the next call of update_blocks_() will move
321  // it back to the first block.
322  size_t data_pos_;
323 
324  // Past-the-end position in the blocks. While reading longer input, it stays at the first byte
325  // of the third bufer, i.e., one past the end of the second block. Only when approaching the
326  // end of the input, it moves closer to zero.
327  size_t data_end_;
328 
329 };
330 
331 } // namespace utils
332 } // namespace genesis
333 
334 #endif // include guard
InputBuffer(std::shared_ptr< BaseInputSource > input_source)
Container namespace for all symbols of genesis in order to keep them separate when used as a library...
InputBuffer(self_type &&other)
self_type & operator=(self_type const &)=delete
static const size_t BlockLength
Block length for internal buffering.
char peek(size_t ahead=1)
size_t read(char *target, size_t size)