A toolkit for working with phylogenetic data.
v0.19.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
input_buffer.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_BUFFER_H_
2 #define GENESIS_UTILS_IO_INPUT_BUFFER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2017 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
35 
36 #include <cassert>
37 #include <memory>
38 #include <stdexcept>
39 #include <string>
40 #include <utility>
41 
42 namespace genesis {
43 namespace utils {
44 
45 // =================================================================================================
46 // Input Buffer
47 // =================================================================================================
48 
50 {
51 public:
52 
53  // -------------------------------------------------------------
54  // Member Types
55  // -------------------------------------------------------------
56 
62  static const size_t BlockLength = 1 << 22;
63 
65  using value_type = char;
66 
67  // -------------------------------------------------------------
68  // Constructors and Rule of Five
69  // -------------------------------------------------------------
70 
72  : source_name_( "invalid source" )
73  , buffer_( nullptr )
74  , data_pos_( 0 )
75  , data_end_( 0 )
76  {}
77 
78  explicit InputBuffer( std::unique_ptr<BaseInputSource> input_source )
79  {
80  init_( std::move( input_source ));
81  }
82 
84  {
85  delete[] buffer_;
86  buffer_ = nullptr;
87  }
88 
89  InputBuffer(self_type const&) = delete;
90  InputBuffer(self_type&&) = delete;
91 
92  self_type& operator= (self_type const&) = delete;
93  self_type& operator= (self_type&&) = delete;
94 
95  // -------------------------------------------------------------
96  // Reading
97  // -------------------------------------------------------------
98 
99  operator bool() const
100  {
101  return data_pos_ < data_end_;
102  }
103 
104  char peek( size_t ahead = 1 )
105  {
106  if( ahead > BlockLength ) {
107  throw std::runtime_error(
108  "Cannot peek ahead more than one block length of the Input Buffer."
109  );
110  }
111 
112  // Common use case: we can securely peek ahead.
113  // Make sure we do not peek at data behind the end.
114  if( data_pos_ + ahead < data_end_ ) {
115  return buffer_[ data_pos_ + ahead ];
116  } else {
117 
118  // Read blocks if necessary. Now we are surely in the first block.
119  update_blocks_();
120  assert( data_pos_ < BlockLength );
121 
122  // Try again. If we still cannot peek ahead, we are at the end of the stream.
123  if( data_pos_ + ahead < data_end_ ) {
124  return buffer_[ data_pos_ + ahead ];
125  } else {
126  return '\0';
127  }
128  }
129  }
130 
131  size_t read( char* target, size_t size )
132  {
133  // Shortcut for most common use case: We are in the first block, and have enough buffer
134  // to return the whole amount of requested data.
135  if( data_pos_ < BlockLength && size < data_end_ - data_pos_ ) {
136  std::memcpy( target, buffer_ + data_pos_, size );
137  data_pos_ += size;
138  return size;
139  }
140 
141  // Keep track of what we still need to do and what we already did.
142  size_t done_reading = 0;
143  size_t yet_to_read = size;
144 
145  // Read blocks if necessary. Now we are surely in the first block.
146  update_blocks_();
147  assert( data_pos_ < BlockLength );
148 
149  // Read data that is too big for one block, as long as there are more blocks.
150  while( yet_to_read > BlockLength && data_end_ == 2 * BlockLength ) {
151 
152  // Read one block.
153  std::memcpy( target + done_reading, buffer_ + data_pos_, BlockLength );
154  data_pos_ += BlockLength;
155 
156  // Update our track keeping.
157  done_reading += BlockLength;
158  yet_to_read -= BlockLength;
159 
160  // Update the blocks.
161  update_blocks_();
162  assert( data_pos_ < BlockLength );
163  assert( data_pos_ < data_end_ );
164  }
165 
166  // Get how much data is still buffered.
167  size_t const buffered = data_end_ - data_pos_;
168 
169  // Don't read more than there is.
170  if( yet_to_read > buffered ) {
171  yet_to_read = buffered;
172  }
173 
174  // Saftey. Never read more than there is.
175  assert( yet_to_read <= buffered );
176  assert( yet_to_read <= BlockLength );
177 
178  // Read rest.
179  std::memcpy( target + done_reading, buffer_ + data_pos_, yet_to_read );
180  data_pos_ += yet_to_read;
181 
182  // Update our track keeping.
183  done_reading += yet_to_read;
184 
185  return done_reading;
186  }
187 
188  // -------------------------------------------------------------
189  // Internal Members
190  // -------------------------------------------------------------
191 
192 private:
193 
197  void update_blocks_()
198  {
199  // Nothing to do. We are already at the end of the input.
200  if( data_pos_ == data_end_ ) {
201  return;
202  }
203  assert( data_pos_ < data_end_ );
204 
205  // If this assertion breaks, someone tempered with our internal invariants.
206  assert( data_end_ <= BlockLength * 2 );
207 
208  // If we are past the first block, we need to load more data into the blocks.
209  if( data_pos_ >= BlockLength ) {
210 
211  // Move the second to the first block.
212  std::memcpy( buffer_, buffer_ + BlockLength, BlockLength );
213  data_pos_ -= BlockLength;
214  data_end_ -= BlockLength;
215 
216  // If we are not yet at the end of the data, start the reader again:
217  // Copy the third block to the second, and read into the third one.
218  if( input_reader_.valid() ) {
219  data_end_ += input_reader_.finish_reading();
220  std::memcpy( buffer_ + BlockLength, buffer_ + 2 * BlockLength, BlockLength );
221  input_reader_.start_reading( buffer_ + 2 * BlockLength, BlockLength );
222  }
223  }
224 
225  // After the update, the current position needs to be within the first block.
226  assert( data_pos_ < BlockLength );
227  }
228 
232  void init_( std::unique_ptr<BaseInputSource> input_source )
233  {
234  // Set to empty defaults if there is no input.
235  if( input_source == nullptr ) {
236  source_name_ = "invalid source";
237 
238  buffer_ = nullptr;
239  data_pos_ = 0;
240  data_end_ = 0;
241 
242  return;
243  }
244 
245  // We use three buffer blocks:
246  // The first two for the current blocks, and the third for the async reading.
247  buffer_ = new char[ 3 * BlockLength ];
248 
249  try {
250  // Set source name.
251  source_name_ = input_source->source_name();
252 
253  // Read up to two blocks.
254  data_pos_ = 0;
255  data_end_ = input_source->read( buffer_, 2 * BlockLength );
256 
257  // If there is more data after the two blocks that we just read, start the
258  // reading process (possibly async, if pthreads is available), into the third block.
259  if( data_end_ == 2 * BlockLength ) {
260  input_reader_.init( std::move( input_source ));
261  input_reader_.start_reading( buffer_ + 2 * BlockLength, BlockLength );
262  }
263 
264  } catch( ... ) {
265  delete[] buffer_;
266  throw;
267  }
268  }
269 
270  // -------------------------------------------------------------
271  // Data Members
272  // -------------------------------------------------------------
273 
274 private:
275 
276  // Input data comes from here...
277  InputReader input_reader_;
278  std::string source_name_;
279 
280  // ...and is buffered here.
281  char* buffer_;
282 
283  // Current position in the buffer. It mostly is in the first block. Once we move into the
284  // second block when advancing this position, the next call of update_blocks_() will move
285  // it back to the first block.
286  size_t data_pos_;
287 
288  // Past-the-end position in the blocks. While reading longer input, it stays at the first byte
289  // of the third bufer, i.e., one past the end of the second block. Only when approaching the
290  // end of the input, it moves closer to zero.
291  size_t data_end_;
292 
293 };
294 
295 } // namespace utils
296 } // namespace genesis
297 
298 #endif // include guard
SynchronousReader InputReader
Alias for the either AsynchronousReader or SynchronousReader, depending on the threading setting...
self_type & operator=(self_type const &)=delete
static const size_t BlockLength
Block length for internal buffering.
char peek(size_t ahead=1)
InputBuffer(std::unique_ptr< BaseInputSource > input_source)
void start_reading(char *target_buffer, int target_size)
void init(std::unique_ptr< BaseInputSource > input_source)
size_t read(char *target, size_t size)