A toolkit for working with phylogenetic data.
v0.24.0
input_buffer.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_BUFFER_H_
2 #define GENESIS_UTILS_IO_INPUT_BUFFER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2020 Lucas Czech and HITS gGmbH
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
35 
36 #include <cassert>
37 #include <memory>
38 #include <stdexcept>
39 #include <string>
40 #include <utility>
41 
42 namespace genesis {
43 namespace utils {
44 
45 // =================================================================================================
46 // Input Buffer
47 // =================================================================================================
48 
50 {
51 public:
52 
53  // -------------------------------------------------------------
54  // Member Types
55  // -------------------------------------------------------------
56 
62  static const size_t BlockLength = 1 << 22;
63 
65  using value_type = char;
66 
67  // -------------------------------------------------------------
68  // Constructors and Rule of Five
69  // -------------------------------------------------------------
70 
72  : source_name_( "invalid source" )
73  , buffer_( nullptr )
74  , data_pos_( 0 )
75  , data_end_( 0 )
76  {}
77 
78  explicit InputBuffer( std::shared_ptr<BaseInputSource> input_source )
79  {
80  init_( input_source );
81  }
82 
84  {
85  delete[] buffer_;
86  buffer_ = nullptr;
87  }
88 
89  InputBuffer(self_type const&) = delete;
90 
92  : buffer_( nullptr )
93  {
94  *this = std::move( other );
95  }
96 
97  self_type& operator= (self_type const&) = delete;
98 
100  {
101  if( this == &other ) {
102  return *this;
103  }
104 
105  input_reader_ = std::move( other.input_reader_ );
106  source_name_ = std::move( other.source_name_ );
107 
108  // Need to free our current buffer.
109  if( buffer_ ) {
110  delete[] buffer_;
111  }
112 
113  // Move the data.
114  buffer_ = other.buffer_;
115  data_pos_ = other.data_pos_;
116  data_end_ = other.data_end_;
117 
118  // Set the other in a valid but empty state and avoid double freeing of the buffer.
119  other.buffer_ = nullptr;
120  other.data_pos_ = 0;
121  other.data_end_ = 0;
122 
123  return *this;
124  }
125 
126  // -------------------------------------------------------------
127  // Reading
128  // -------------------------------------------------------------
129 
130  operator bool() const
131  {
132  return data_pos_ < data_end_;
133  }
134 
135  char peek( size_t ahead = 1 )
136  {
137  if( data_pos_ + ahead < data_end_ ) {
138 
139  // Common use case: we can securely peek ahead.
140  // Make sure we do not peek at data behind the end.
141  return buffer_[ data_pos_ + ahead ];
142 
143  } else {
144  // More complex use case where we peek ahead a bit further
145 
146  if( ahead > BlockLength ) {
147  throw std::runtime_error(
148  "Cannot peek ahead more than one block length of the Input Buffer."
149  );
150  }
151 
152  // Read blocks if necessary. Now we are surely in the first block.
153  update_blocks_();
154  assert( data_pos_ < BlockLength );
155 
156  // Try again. If we still cannot peek ahead, we are at the end of the stream.
157  if( data_pos_ + ahead < data_end_ ) {
158  return buffer_[ data_pos_ + ahead ];
159  } else {
160  return '\0';
161  }
162  }
163  }
164 
165  size_t read( char* target, size_t size )
166  {
167  // Shortcut for most common use case: We are in the first block, and have enough buffer
168  // to return the whole amount of requested data.
169  if( data_pos_ < BlockLength && size < data_end_ - data_pos_ ) {
170  std::memcpy( target, buffer_ + data_pos_, size );
171  data_pos_ += size;
172  return size;
173  }
174 
175  // Keep track of what we still need to do and what we already did.
176  size_t done_reading = 0;
177  size_t yet_to_read = size;
178 
179  // Read blocks if necessary. Now we are surely in the first block.
180  update_blocks_();
181  assert( data_pos_ < BlockLength );
182 
183  // Read data that is too big for one block, as long as there are more blocks.
184  while( yet_to_read > BlockLength && data_end_ == 2 * BlockLength ) {
185 
186  // Read one block.
187  std::memcpy( target + done_reading, buffer_ + data_pos_, BlockLength );
188  data_pos_ += BlockLength;
189 
190  // Update our track keeping.
191  done_reading += BlockLength;
192  yet_to_read -= BlockLength;
193 
194  // Update the blocks.
195  update_blocks_();
196  assert( data_pos_ < BlockLength );
197  assert( data_pos_ < data_end_ );
198  }
199 
200  // Get how much data is still buffered.
201  size_t const buffered = data_end_ - data_pos_;
202 
203  // Don't read more than there is.
204  if( yet_to_read > buffered ) {
205  yet_to_read = buffered;
206  }
207 
208  // Saftey. Never read more than there is.
209  assert( yet_to_read <= buffered );
210  assert( yet_to_read <= BlockLength );
211 
212  // Read rest.
213  std::memcpy( target + done_reading, buffer_ + data_pos_, yet_to_read );
214  data_pos_ += yet_to_read;
215 
216  // Update our track keeping.
217  done_reading += yet_to_read;
218 
219  return done_reading;
220  }
221 
222  // -------------------------------------------------------------
223  // Internal Members
224  // -------------------------------------------------------------
225 
226 private:
227 
231  void update_blocks_()
232  {
233  // Nothing to do. We are already at the end of the input.
234  if( data_pos_ == data_end_ ) {
235  return;
236  }
237  assert( data_pos_ < data_end_ );
238 
239  // If this assertion breaks, someone tempered with our internal invariants.
240  assert( data_end_ <= BlockLength * 2 );
241 
242  // If we are past the first block, we need to load more data into the blocks.
243  if( data_pos_ >= BlockLength ) {
244 
245  // Move the second to the first block.
246  std::memcpy( buffer_, buffer_ + BlockLength, BlockLength );
247  data_pos_ -= BlockLength;
248  data_end_ -= BlockLength;
249 
250  // If we are not yet at the end of the data, start the reader again:
251  // Copy the third block to the second, and read into the third one.
252  if( input_reader_ && input_reader_->valid() ) {
253  data_end_ += input_reader_->finish_reading();
254  std::memcpy( buffer_ + BlockLength, buffer_ + 2 * BlockLength, BlockLength );
255  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
256  }
257  }
258 
259  // After the update, the current position needs to be within the first block.
260  assert( data_pos_ < BlockLength );
261  }
262 
266  void init_( std::shared_ptr<BaseInputSource> input_source )
267  {
268  // Set to empty defaults if there is no input.
269  if( input_source == nullptr ) {
270  source_name_ = "invalid source";
271 
272  buffer_ = nullptr;
273  data_pos_ = 0;
274  data_end_ = 0;
275 
276  return;
277  }
278  // We use three buffer blocks:
279  // The first two for the current blocks, and the third for the async reading.
280  buffer_ = new char[ 3 * BlockLength ];
281 
282  try {
283  // Set source name.
284  source_name_ = input_source->source_name();
285 
286  // Read up to two blocks.
287  data_pos_ = 0;
288  data_end_ = input_source->read( buffer_, 2 * BlockLength );
289 
290  // If there is more data after the two blocks that we just read, start the
291  // reading process (possibly async, if pthreads is available), into the third block.
292  if( data_end_ == 2 * BlockLength ) {
293 
294  // Create the reader. We need to do this explictily,
295  // as we use a unique ptr to make this class movable.
296  input_reader_ = utils::make_unique<InputReader>();
297 
298  input_reader_->init( input_source );
299  input_reader_->start_reading( buffer_ + 2 * BlockLength, BlockLength );
300  }
301 
302  } catch( ... ) {
303  delete[] buffer_;
304  throw;
305  }
306  }
307 
308  // -------------------------------------------------------------
309  // Data Members
310  // -------------------------------------------------------------
311 
312 private:
313 
314  // Input data comes from here...
315  // (we use a unique ptr to make the class movable)
316  std::unique_ptr<InputReader> input_reader_ = nullptr;
317  std::string source_name_;
318 
319  // ...and is buffered here.
320  char* buffer_;
321 
322  // Current position in the buffer. It mostly is in the first block. Once we move into the
323  // second block when advancing this position, the next call of update_blocks_() will move
324  // it back to the first block.
325  size_t data_pos_;
326 
327  // Past-the-end position in the blocks. While reading longer input, it stays at the first byte
328  // of the third bufer, i.e., one past the end of the second block. Only when approaching the
329  // end of the input, it moves closer to zero.
330  size_t data_end_;
331 
332 };
333 
334 } // namespace utils
335 } // namespace genesis
336 
337 #endif // include guard
InputBuffer(std::shared_ptr< BaseInputSource > input_source)
Container namespace for all symbols of genesis in order to keep them separate when used as a library...
InputBuffer(self_type &&other)
self_type & operator=(self_type const &)=delete
static const size_t BlockLength
Block length for internal buffering.
char peek(size_t ahead=1)
size_t read(char *target, size_t size)