A toolkit for working with phylogenetic data.
v0.18.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
input_reader.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_READER_H_
2 #define GENESIS_UTILS_IO_INPUT_READER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2017 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@h-its.org>
23  Exelixis Lab, Heidelberg Institute for Theoretical Studies
24  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
25 */
26 
35 
36 #include <memory>
37 #include <string>
38 #include <utility>
39 
40 #ifdef GENESIS_PTHREADS
41 # include <condition_variable>
42 # include <mutex>
43 # include <thread>
44 #endif
45 
46 namespace genesis {
47 namespace utils {
48 
49 // =================================================================================================
50 // Input Reader
51 // =================================================================================================
52 
53 #ifdef GENESIS_PTHREADS
54 
55  class AsynchronousReader;
56 
69  using InputReader = AsynchronousReader;
70 
71 #else
72 
73  class SynchronousReader;
74 
88 
89 #endif
90 
91 // =================================================================================================
92 // Asynchronous Reader
93 // =================================================================================================
94 
95 #ifdef GENESIS_PTHREADS
96 
113 class AsynchronousReader
114 {
115 public:
116 
117  // -------------------------------------------------------------
118  // Constructors and Rule of Five
119  // -------------------------------------------------------------
120 
121  AsynchronousReader() = default;
122 
123  AsynchronousReader( AsynchronousReader const& ) = delete;
124  AsynchronousReader( AsynchronousReader&& ) = delete;
125 
126  // Not sure if this move constructor and the move assignment are correct.
127  // We don't need them right now, so better disable them.
128  // Inspired by http://stackoverflow.com/questions/29986208/how-should-i-deal-with-mutexes-in-movable-types-in-c
129 
130  // AsynchronousReader( AsynchronousReader&& other )
131  // {
132  // std::unique_lock< std::mutex > guard( lock_ );
133  //
134  // input_source_ = std::move( other.input_source_ );
135  //
136  // target_buffer_ = std::move( other.target_buffer_ );
137  // target_size_ = std::move( other.target_size_ );
138  // achieved_size_ = std::move( other.achieved_size_ );
139  //
140  // worker_ = std::move( other.worker_ );
141  // destructor_called_ = std::move( other.destructor_called_ );
142  // read_except_ptr_ = std::move( other.read_except_ptr_ );
143  // }
144 
145  AsynchronousReader& operator= ( AsynchronousReader const& ) = delete;
146  AsynchronousReader& operator= ( AsynchronousReader&& ) = delete;
147 
148  // AsynchronousReader& operator= ( AsynchronousReader&& other )
149  // {
150  // if( this == &other ) {
151  // return *this;
152  // }
153  //
154  // std::unique_lock< std::mutex > guard_lhs( lock_, std::defer_lock );
155  // std::unique_lock< std::mutex > guard_rhs( other.lock_, std::defer_lock );
156  // std::lock( guard_lhs, guard_rhs);
157  //
158  // input_source_ = std::move( other.input_source_ );
159  //
160  // target_buffer_ = std::move( other.target_buffer_ );
161  // target_size_ = std::move( other.target_size_ );
162  // achieved_size_ = std::move( other.achieved_size_ );
163  //
164  // worker_ = std::move( other.worker_ );
165  // destructor_called_ = std::move( other.destructor_called_ );
166  // read_except_ptr_ = std::move( other.read_except_ptr_ );
167  //
168  // return *this;
169  // }
170 
171  ~AsynchronousReader()
172  {
173  if( input_source_ == nullptr ) {
174  return;
175  }
176 
177  // Terminate the reading process, in case it is still running.
178  { // Scoped lock.
179  std::unique_lock< std::mutex > guard( lock_ );
180  destructor_called_ = true;
181  }
182 
183  cond_read_requested_.notify_one();
184  worker_.join();
185  }
186 
187  // -------------------------------------------------------------
188  // Init and General Members
189  // -------------------------------------------------------------
190 
191  void init( std::unique_ptr< BaseInputSource > input_source )
192  {
193  // Get a lock.
194  std::unique_lock< std::mutex > guard( lock_ );
195 
196  // Prepare input variables.
197  input_source_ = std::move( input_source );
198  target_size_ = -1;
199  destructor_called_ = false;
200 
201  // Prepare worker thread.
202  worker_ = std::thread( [&] {
203  std::unique_lock< std::mutex > guard( lock_ );
204  try {
205 
206  // Read until termination requested.
207  while( true ) {
208 
209  // Condition: wait until the master wants the worker to read.
210  cond_read_requested_.wait(
211  guard,
212  [&] () {
213  return ( target_size_ != -1 ) || destructor_called_;
214  }
215  );
216 
217  // If we are about to destroy the object, we can stop here.
218  if( destructor_called_ ) {
219  return;
220  }
221 
222  // Read.
223  achieved_size_ = input_source_->read( target_buffer_, target_size_ );
224  target_size_ = -1;
225 
226  // If we did not get any data, we are done with the input source.
227  if( achieved_size_ == 0 ) {
228  break;
229  }
230 
231  cond_read_finished_.notify_one();
232  }
233 
234  // Store any exception, so that we can re-throw from main thread.
235  } catch( ... ) {
236  read_except_ptr_ = std::current_exception();
237  }
238 
239  cond_read_finished_.notify_one();
240  });
241  }
242 
243  bool valid() const
244  {
245  return input_source_ != nullptr;
246  }
247 
248  BaseInputSource const* input_source() const
249  {
250  return input_source_.get();
251  }
252 
253  std::string class_name() const
254  {
255  return "AsynchronousReader";
256  }
257 
258  // -------------------------------------------------------------
259  // Reading
260  // -------------------------------------------------------------
261 
262  void start_reading( char* target_buffer, int target_size )
263  {
264  // Set the target variables and start the worker.
265  std::unique_lock< std::mutex > guard( lock_ );
266  target_buffer_ = target_buffer;
267  target_size_ = target_size;
268  achieved_size_ = -1;
269  cond_read_requested_.notify_one();
270  }
271 
272  int finish_reading()
273  {
274  // Wait until the worker is done reading.
275  std::unique_lock< std::mutex > guard(lock_);
276  cond_read_finished_.wait(
277  guard,
278  [&]{
279  return achieved_size_ != -1 || read_except_ptr_;
280  }
281  );
282 
283  // If there was an exception, re-throw. Otherwise, return number of read bytes.
284  if( read_except_ptr_ ) {
285  std::rethrow_exception( read_except_ptr_ );
286  } else {
287  return achieved_size_;
288  }
289  }
290 
291  // -------------------------------------------------------------
292  // Internal Members
293  // -------------------------------------------------------------
294 
295 private:
296 
297  std::unique_ptr<BaseInputSource> input_source_;
298 
299  char* target_buffer_;
300  int target_size_;
301  int achieved_size_;
302 
303  std::thread worker_;
304  bool destructor_called_;
305  std::exception_ptr read_except_ptr_;
306 
307  std::mutex lock_;
308  std::condition_variable cond_read_requested_;
309  std::condition_variable cond_read_finished_;
310 };
311 
312 #endif
313 
314 // =================================================================================================
315 // Synchronous Reader
316 // =================================================================================================
317 
333 {
334 public:
335 
336  // -------------------------------------------------------------
337  // Constructors and Rule of Five
338  // -------------------------------------------------------------
339 
340  SynchronousReader() = default;
341  ~SynchronousReader() = default;
342 
343  SynchronousReader( SynchronousReader const& ) = delete;
344  SynchronousReader( SynchronousReader&& ) = default;
345 
346  SynchronousReader& operator= ( SynchronousReader const& ) = delete;
348 
349  // -------------------------------------------------------------
350  // Init and General Members
351  // -------------------------------------------------------------
352 
353  void init( std::unique_ptr<BaseInputSource> input_source )
354  {
355  input_source_ = std::move( input_source );
356  }
357 
358  bool valid() const
359  {
360  return input_source_ != nullptr;
361  }
362 
364  {
365  return input_source_.get();
366  }
367 
368  std::string class_name() const
369  {
370  return "SynchronousReader";
371  }
372 
373  // -------------------------------------------------------------
374  // Reading
375  // -------------------------------------------------------------
376 
377  void start_reading( char* target_buffer, int target_size )
378  {
379  target_buffer_ = target_buffer;
380  target_size_ = target_size;
381  }
382 
384  {
385  return input_source_->read( target_buffer_, target_size_ );
386  }
387 
388  // -------------------------------------------------------------
389  // Internal Members
390  // -------------------------------------------------------------
391 
392 private:
393 
394  std::unique_ptr<BaseInputSource> input_source_;
395 
396  char* target_buffer_;
397  int target_size_;
398 };
399 
400 } // namespace utils
401 } // namespace genesis
402 
403 #endif // include guard
BaseInputSource const * input_source() const
SynchronousReader InputReader
Alias for the either AsynchronousReader or SynchronousReader, depending on the threading setting...
Read bytes from an InputSource into a char buffer.
SynchronousReader & operator=(SynchronousReader const &)=delete
Abstract base class for reading byte data from input sources.
void start_reading(char *target_buffer, int target_size)
void init(std::unique_ptr< BaseInputSource > input_source)