A library for working with phylogenetic and population genetic data.
v0.32.0
input_reader.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_IO_INPUT_READER_H_
2 #define GENESIS_UTILS_IO_INPUT_READER_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
37 
38 #include <cassert>
39 #include <memory>
40 #include <string>
41 #include <stdexcept>
42 #include <utility>
43 
44 namespace genesis {
45 namespace utils {
46 
47 // =================================================================================================
48 // Input Reader
49 // =================================================================================================
50 
51 class AsynchronousReader;
52 class SynchronousReader;
53 
62 
63 // =================================================================================================
64 // Asynchronous Reader
65 // =================================================================================================
66 
94 {
95 public:
96 
97  // -------------------------------------------------------------
98  // Constructors and Rule of Five
99  // -------------------------------------------------------------
100 
102  std::shared_ptr<BaseInputSource> input_source,
103  std::shared_ptr<ThreadPool> thread_pool = nullptr
104  ) {
105  if( !input_source ) {
106  throw std::runtime_error( "Cannot read from BaseInputSource nullptr" );
107  }
108 
109  // Depending on the async policy (see below), we might want to use a thread pool for the
110  // reading - either the given one, or if none is given, the global one. We use a lambda
111  // for that decision to avoid code duplcation below, but also being able to ignore the
112  // global thread pool if we do not need it. This is relevant when we do not want to use
113  // it, and init_global_thread_pool() has not been called.
114  auto get_thread_pool_ = [&]()
115  {
116  if( thread_pool ) {
117  return thread_pool;
118  }
119  return Options::get().global_thread_pool();
120  };
121 
122  // Set the threads according to the global options policy for file reading.
123  // By default, for trivial input such as reading a simple file, use our own thread.
124  // This makes sure that simple file I/O does not need to wait in the thread pool
125  // to be executed, but gets spawned in its own thread that can operate idependently
126  // of the pool, and just wait for the I/O to be done, without blocking any other
127  // threads while waiting.
128  // For complex sources, we use the given or global thread pool instead, for instance
129  // when decompressing gzipped files. That means that internally, for such sources,
130  // the actual I/O for getting the data that needs to be decompressed is blocking
131  // that thread until I/O is completed. We could add another layer of data fetching
132  // to avoid that, and load the gzipped data async into some other buffer first,
133  // but well, decompression probably takes up enough time for that to not matter too much.
134  switch( Options::get().input_reading_thread_policy() ) {
136  thread_pool_ = get_thread_pool_();
137  break;
138  }
140  if( input_source->is_trivial() ) {
141  thread_pool_ = std::make_shared<utils::ThreadPool>( 1 );
142  } else {
143  thread_pool_ = get_thread_pool_();
144  }
145  break;
146  }
148  thread_pool_ = std::make_shared<utils::ThreadPool>( 1 );
149  break;
150  }
151  }
152  input_source_ = input_source;
153  }
154 
155  AsynchronousReader( AsynchronousReader const& ) = delete;
157 
160 
161  ~AsynchronousReader() = default;
162 
163  // -------------------------------------------------------------
164  // Init and General Members
165  // -------------------------------------------------------------
166 
167  bool valid() const
168  {
169  return input_source_ != nullptr;
170  }
171 
172  std::shared_ptr<BaseInputSource> input_source() const
173  {
174  return input_source_;
175  }
176 
177  std::string class_name() const
178  {
179  return "AsynchronousReader";
180  }
181 
182  // -------------------------------------------------------------
183  // Reading
184  // -------------------------------------------------------------
185 
186  void start_reading( char* target_buffer, size_t target_size )
187  {
188  // We only assert the validity of the buffer here, as it is a user error to not provide
189  // a large enough buffer here.
190  assert( target_buffer );
191 
192  // The function shall only ever be called once in a row, followed by finish_reading().
193  // Not doing that is a user error, so we here just assert that here.
194  assert( ! future_.valid() );
195 
196  // We need a local copy of the input source here, as C++11 does not allow to capture
197  // class member variables by value...
198  auto input_source = input_source_;
199 
200  // We capture the target by value, meaning that the caller has to stay alive until the
201  // task is finished, so that we don't get a memory access violation for the buffer.
202  future_ = thread_pool_->enqueue_and_retrieve(
203  [=](){
204  return input_source->read( target_buffer, target_size );
205  }
206  );
207  }
208 
209  size_t finish_reading()
210  {
211  // Same as above for start_reading(), we here only assert the correct order of execution.
212  assert( future_.valid() );
213 
214  // Now get the future, which blocks until the data is actually there.
215  // This also re-throws any errors that might have occurred during executing,
216  // see https://stackoverflow.com/q/14222899/4184258
217  return future_.get();
218  }
219 
220  // -------------------------------------------------------------
221  // Internal Members
222  // -------------------------------------------------------------
223 
224 private:
225 
226  // Where to read from.
227  std::shared_ptr<BaseInputSource> input_source_;
228 
229  // Thread pool to run the reading in the background.
230  std::shared_ptr<ThreadPool> thread_pool_;
231 
232  // Future that stores the achieved size of how many bytes were red.
233  // If we ever want to make this class moveable, this probably needs to live in a shared_ptr.
234  ProactiveFuture<size_t> future_;
235 
236 };
237 
238 // =================================================================================================
239 // Synchronous Reader
240 // =================================================================================================
241 
253 {
254 public:
255 
256  // -------------------------------------------------------------
257  // Constructors and Rule of Five
258  // -------------------------------------------------------------
259 
261  std::shared_ptr<BaseInputSource> input_source
262  )
263  : input_source_( input_source )
264  {}
265 
266  ~SynchronousReader() = default;
267 
268  SynchronousReader( SynchronousReader const& ) = delete;
269  SynchronousReader( SynchronousReader&& ) = default;
270 
271  SynchronousReader& operator= ( SynchronousReader const& ) = delete;
273 
274  // -------------------------------------------------------------
275  // Init and General Members
276  // -------------------------------------------------------------
277 
278  bool valid() const
279  {
280  return input_source_ != nullptr;
281  }
282 
283  std::shared_ptr<BaseInputSource> input_source() const
284  {
285  return input_source_;
286  }
287 
288  std::string class_name() const
289  {
290  return "SynchronousReader";
291  }
292 
293  // -------------------------------------------------------------
294  // Reading
295  // -------------------------------------------------------------
296 
297  void start_reading( char* target_buffer, size_t target_size )
298  {
299  target_buffer_ = target_buffer;
300  target_size_ = target_size;
301  }
302 
303  size_t finish_reading()
304  {
305  return input_source_->read( target_buffer_, target_size_ );
306  }
307 
308  // -------------------------------------------------------------
309  // Internal Members
310  // -------------------------------------------------------------
311 
312 private:
313 
314  std::shared_ptr<BaseInputSource> input_source_;
315 
316  char* target_buffer_;
317  size_t target_size_;
318 };
319 
320 } // namespace utils
321 } // namespace genesis
322 
323 #endif // include guard
genesis::utils::AsynchronousReader::input_source
std::shared_ptr< BaseInputSource > input_source() const
Definition: input_reader.hpp:172
genesis::utils::SynchronousReader::finish_reading
size_t finish_reading()
Definition: input_reader.hpp:303
genesis::utils::AsynchronousReader
Read bytes from an InputSource into a char buffer.
Definition: input_reader.hpp:93
genesis::utils::Options::global_thread_pool
std::shared_ptr< ThreadPool > global_thread_pool() const
Return a global thread pool to be used for parallel computations.
Definition: options.hpp:268
genesis::utils::AsynchronousReader::class_name
std::string class_name() const
Definition: input_reader.hpp:177
genesis::utils::ProactiveFuture::get
T get()
Return the result, after calling wait().
Definition: thread_pool.hpp:122
genesis::utils::SynchronousReader::start_reading
void start_reading(char *target_buffer, size_t target_size)
Definition: input_reader.hpp:297
genesis::utils::SynchronousReader::input_source
std::shared_ptr< BaseInputSource > input_source() const
Definition: input_reader.hpp:283
genesis::utils::AsynchronousReader::operator=
AsynchronousReader & operator=(AsynchronousReader const &)=delete
genesis::utils::SynchronousReader::class_name
std::string class_name() const
Definition: input_reader.hpp:288
genesis::utils::AsynchronousReader::start_reading
void start_reading(char *target_buffer, size_t target_size)
Definition: input_reader.hpp:186
input_source.hpp
genesis::utils::SynchronousReader
Read bytes from an InputSource into a char buffer.
Definition: input_reader.hpp:252
genesis::utils::SynchronousReader::valid
bool valid() const
Definition: input_reader.hpp:278
genesis::utils::SynchronousReader::operator=
SynchronousReader & operator=(SynchronousReader const &)=delete
genesis::utils::SynchronousReader::SynchronousReader
SynchronousReader(std::shared_ptr< BaseInputSource > input_source)
Definition: input_reader.hpp:260
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::SynchronousReader::~SynchronousReader
~SynchronousReader()=default
genesis::utils::Options::InputReadingThreadPolicy::kAllAsync
@ kAllAsync
All reading from input sources gets their own thread.
genesis::utils::AsynchronousReader::AsynchronousReader
AsynchronousReader(std::shared_ptr< BaseInputSource > input_source, std::shared_ptr< ThreadPool > thread_pool=nullptr)
Definition: input_reader.hpp:101
options.hpp
genesis::utils::Options::InputReadingThreadPolicy::kTrivialAsync
@ kTrivialAsync
Use async threads for trivial input sources, such as simple files, but use the global thread pool for...
genesis::utils::Options::InputReadingThreadPolicy::kStrict
@ kStrict
All input reading uses the global thread pool.
genesis::utils::ProactiveFuture< size_t >
genesis::utils::ProactiveFuture::valid
bool valid() const noexcept
Check if the future has a shared state.
Definition: thread_pool.hpp:163
genesis::utils::Options::get
static Options & get()
Returns a single instance of this class.
Definition: options.hpp:68
genesis::utils::AsynchronousReader::~AsynchronousReader
~AsynchronousReader()=default
thread_pool.hpp
genesis::utils::AsynchronousReader::finish_reading
size_t finish_reading()
Definition: input_reader.hpp:209
genesis::utils::AsynchronousReader::valid
bool valid() const
Definition: input_reader.hpp:167