A toolkit for working with phylogenetic data.
v0.24.0
input_reader.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_READER_H_
2 #define GENESIS_UTILS_IO_INPUT_READER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2019 Lucas Czech and HITS gGmbH
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
35 
36 #include <cassert>
37 #include <memory>
38 #include <string>
39 #include <utility>
40 
41 #ifdef GENESIS_PTHREADS
42 # include <condition_variable>
43 # include <mutex>
44 # include <thread>
45 #endif
46 
47 namespace genesis {
48 namespace utils {
49 
50 // =================================================================================================
51 // Input Reader
52 // =================================================================================================
53 
54 #ifdef GENESIS_PTHREADS
55 
56  class AsynchronousReader;
57 
71 
72 #else
73 
74  class SynchronousReader;
75 
89 
90 #endif
91 
92 // =================================================================================================
93 // Asynchronous Reader
94 // =================================================================================================
95 
96 #ifdef GENESIS_PTHREADS
97 
115 {
116 public:
117 
118  // -------------------------------------------------------------
119  // Constructors and Rule of Five
120  // -------------------------------------------------------------
121 
122  AsynchronousReader() = default;
123 
124  AsynchronousReader( AsynchronousReader const& ) = delete;
126 
127  // Not sure if this move constructor and the move assignment are correct.
128  // We don't need them right now, so better disable them.
129  // Inspired by http://stackoverflow.com/questions/29986208/how-should-i-deal-with-mutexes-in-movable-types-in-c
130 
131  // AsynchronousReader( AsynchronousReader&& other )
132  // {
133  // std::unique_lock< std::mutex > guard( lock_ );
134  //
135  // input_source_ = std::move( other.input_source_ );
136  //
137  // target_buffer_ = std::move( other.target_buffer_ );
138  // target_size_ = std::move( other.target_size_ );
139  // achieved_size_ = std::move( other.achieved_size_ );
140  //
141  // worker_ = std::move( other.worker_ );
142  // destructor_called_ = std::move( other.destructor_called_ );
143  // read_except_ptr_ = std::move( other.read_except_ptr_ );
144  // }
145 
148 
149  // AsynchronousReader& operator= ( AsynchronousReader&& other )
150  // {
151  // if( this == &other ) {
152  // return *this;
153  // }
154  //
155  // std::unique_lock< std::mutex > guard_lhs( lock_, std::defer_lock );
156  // std::unique_lock< std::mutex > guard_rhs( other.lock_, std::defer_lock );
157  // std::lock( guard_lhs, guard_rhs);
158  //
159  // input_source_ = std::move( other.input_source_ );
160  //
161  // target_buffer_ = std::move( other.target_buffer_ );
162  // target_size_ = std::move( other.target_size_ );
163  // achieved_size_ = std::move( other.achieved_size_ );
164  //
165  // worker_ = std::move( other.worker_ );
166  // destructor_called_ = std::move( other.destructor_called_ );
167  // read_except_ptr_ = std::move( other.read_except_ptr_ );
168  //
169  // return *this;
170  // }
171 
173  {
174  if( input_source_ == nullptr ) {
175  return;
176  }
177 
178  // Terminate the reading process, in case it is still running.
179  { // Scoped lock.
180  std::unique_lock< std::mutex > guard( lock_ );
181  destructor_called_ = true;
182  }
183 
184  cond_read_requested_.notify_one();
185  worker_.join();
186  }
187 
188  // -------------------------------------------------------------
189  // Init and General Members
190  // -------------------------------------------------------------
191 
192  void init( std::shared_ptr< BaseInputSource > input_source )
193  {
194  // Get a lock.
195  std::unique_lock< std::mutex > init_guard( lock_ );
196 
197  // Prepare input variables.
198  input_source_ = input_source;
199  target_size_ = -1;
200  destructor_called_ = false;
201 
202  // Prepare worker thread.
203  worker_ = std::thread( [&] {
204  std::unique_lock< std::mutex > worker_guard( lock_ );
205  try {
206 
207  // Read until termination requested.
208  while( true ) {
209 
210  // Condition: wait until the master wants the worker to read.
211  cond_read_requested_.wait(
212  worker_guard,
213  [&] () {
214  return ( target_size_ != -1 ) || destructor_called_;
215  }
216  );
217 
218  // If we are about to destroy the object, we can stop here.
219  if( destructor_called_ ) {
220  return;
221  }
222 
223  // Read.
224  assert( target_size_ >= 0 );
225  achieved_size_ = input_source_->read(
226  target_buffer_,
227  static_cast<size_t>( target_size_ )
228  );
229  target_size_ = -1;
230 
231  // If we did not get any data, we are done with the input source.
232  if( achieved_size_ == 0 ) {
233  break;
234  }
235 
236  cond_read_finished_.notify_one();
237  }
238 
239  // Store any exception, so that we can re-throw from main thread.
240  } catch( ... ) {
241  read_except_ptr_ = std::current_exception();
242  }
243 
244  cond_read_finished_.notify_one();
245  });
246  }
247 
248  bool valid() const
249  {
250  return input_source_ != nullptr;
251  }
252 
254  {
255  return input_source_.get();
256  }
257 
258  std::string class_name() const
259  {
260  return "AsynchronousReader";
261  }
262 
263  // -------------------------------------------------------------
264  // Reading
265  // -------------------------------------------------------------
266 
267  void start_reading( char* target_buffer, long target_size )
268  {
269  // Set the target variables and start the worker.
270  std::unique_lock< std::mutex > guard( lock_ );
271  target_buffer_ = target_buffer;
272  target_size_ = target_size;
273  achieved_size_ = -1;
274  cond_read_requested_.notify_one();
275  }
276 
278  {
279  // Wait until the worker is done reading.
280  std::unique_lock< std::mutex > guard(lock_);
281  cond_read_finished_.wait(
282  guard,
283  [&]{
284  return achieved_size_ != -1 || read_except_ptr_;
285  }
286  );
287 
288  // If there was an exception, re-throw. Otherwise, return number of read bytes.
289  if( read_except_ptr_ ) {
290  std::rethrow_exception( read_except_ptr_ );
291  } else {
292  return achieved_size_;
293  }
294  }
295 
296  // -------------------------------------------------------------
297  // Internal Members
298  // -------------------------------------------------------------
299 
300 private:
301 
302  std::shared_ptr<BaseInputSource> input_source_;
303 
304  char* target_buffer_;
305  long target_size_;
306  long achieved_size_;
307 
308  std::thread worker_;
309  bool destructor_called_;
310  std::exception_ptr read_except_ptr_;
311 
312  std::mutex lock_;
313  std::condition_variable cond_read_requested_;
314  std::condition_variable cond_read_finished_;
315 };
316 
317 #endif
318 
319 // =================================================================================================
320 // Synchronous Reader
321 // =================================================================================================
322 
338 {
339 public:
340 
341  // -------------------------------------------------------------
342  // Constructors and Rule of Five
343  // -------------------------------------------------------------
344 
345  SynchronousReader() = default;
346  ~SynchronousReader() = default;
347 
348  SynchronousReader( SynchronousReader const& ) = delete;
349  SynchronousReader( SynchronousReader&& ) = default;
350 
351  SynchronousReader& operator= ( SynchronousReader const& ) = delete;
353 
354  // -------------------------------------------------------------
355  // Init and General Members
356  // -------------------------------------------------------------
357 
358  void init( std::shared_ptr<BaseInputSource> input_source )
359  {
360  input_source_ = input_source;
361  }
362 
363  bool valid() const
364  {
365  return input_source_ != nullptr;
366  }
367 
369  {
370  return input_source_.get();
371  }
372 
373  std::string class_name() const
374  {
375  return "SynchronousReader";
376  }
377 
378  // -------------------------------------------------------------
379  // Reading
380  // -------------------------------------------------------------
381 
382  void start_reading( char* target_buffer, long target_size )
383  {
384  target_buffer_ = target_buffer;
385  target_size_ = target_size;
386  }
387 
389  {
390  return input_source_->read( target_buffer_, target_size_ );
391  }
392 
393  // -------------------------------------------------------------
394  // Internal Members
395  // -------------------------------------------------------------
396 
397 private:
398 
399  std::shared_ptr<BaseInputSource> input_source_;
400 
401  char* target_buffer_;
402  long target_size_;
403 };
404 
405 } // namespace utils
406 } // namespace genesis
407 
408 #endif // include guard
BaseInputSource const * input_source() const
Container namespace for all symbols of genesis in order to keep them separate when used as a library...
void start_reading(char *target_buffer, long target_size)
void init(std::shared_ptr< BaseInputSource > input_source)
Read bytes from an InputSource into a char buffer.
AsynchronousReader & operator=(AsynchronousReader const &)=delete
BaseInputSource const * input_source() const
Abstract base class for reading byte data from input sources.
void init(std::shared_ptr< BaseInputSource > input_source)
Read bytes from an InputSource into a char buffer.
void start_reading(char *target_buffer, long target_size)