1 #ifndef GENESIS_UTILS_IO_INPUT_READER_H_
2 #define GENESIS_UTILS_IO_INPUT_READER_H_
41 #ifdef GENESIS_PTHREADS
42 # include <condition_variable>
54 #ifdef GENESIS_PTHREADS
56 class AsynchronousReader;
96 #ifdef GENESIS_PTHREADS
174 if( input_source_ ==
nullptr ) {
180 std::unique_lock< std::mutex > guard( lock_ );
181 destructor_called_ =
true;
184 cond_read_requested_.notify_one();
195 std::unique_lock< std::mutex > init_guard( lock_ );
200 destructor_called_ =
false;
203 worker_ = std::thread( [&] {
204 std::unique_lock< std::mutex > worker_guard( lock_ );
211 cond_read_requested_.wait(
214 return ( target_size_ != -1 ) || destructor_called_;
219 if( destructor_called_ ) {
224 assert( target_size_ >= 0 );
225 achieved_size_ = input_source_->read(
227 static_cast<size_t>( target_size_ )
232 if( achieved_size_ == 0 ) {
236 cond_read_finished_.notify_one();
241 read_except_ptr_ = std::current_exception();
244 cond_read_finished_.notify_one();
250 return input_source_ !=
nullptr;
255 return input_source_.get();
260 return "AsynchronousReader";
270 std::unique_lock< std::mutex > guard( lock_ );
271 target_buffer_ = target_buffer;
272 target_size_ = target_size;
274 cond_read_requested_.notify_one();
280 std::unique_lock< std::mutex > guard(lock_);
281 cond_read_finished_.wait(
284 return achieved_size_ != -1 || read_except_ptr_;
289 if( read_except_ptr_ ) {
290 std::rethrow_exception( read_except_ptr_ );
292 return achieved_size_;
302 std::shared_ptr<BaseInputSource> input_source_;
304 char* target_buffer_;
309 bool destructor_called_;
310 std::exception_ptr read_except_ptr_;
313 std::condition_variable cond_read_requested_;
314 std::condition_variable cond_read_finished_;
365 return input_source_ !=
nullptr;
370 return input_source_.get();
375 return "SynchronousReader";
384 target_buffer_ = target_buffer;
385 target_size_ = target_size;
390 return input_source_->read( target_buffer_, target_size_ );
399 std::shared_ptr<BaseInputSource> input_source_;
401 char* target_buffer_;
408 #endif // include guard