A library for working with phylogenetic data.
v0.25.0
gzip_block_ostream.cpp
Go to the documentation of this file.
1 /*
2  Genesis - A toolkit for working with phylogenetic data.
3  Copyright (C) 2014-2020 Lucas Czech
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  Contact:
19  Lucas Czech <lucas.czech@h-its.org>
20  Exelixis Lab, Heidelberg Institute for Theoretical Studies
21  Schloss-Wolfsbrunnenweg 35, D-69118 Heidelberg, Germany
22 */
23 
32 
34 
35 #include <cassert>
36 #include <fstream>
37 #include <future>
38 #include <memory>
39 #include <sstream>
40 #include <stdexcept>
41 #include <string>
42 #include <utility>
43 #include <vector>
44 
45 #ifdef GENESIS_ZLIB
46 
47 # include "zlib.h"
48 
49 # if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(__CYGWIN__)
50 # include <fcntl.h>
51 # include <io.h>
52 # endif
53 
54 #endif // GENESIS_ZLIB
55 
56 #ifdef GENESIS_OPENMP
57 # include <omp.h>
58 #endif // GENESIS_OPENMP
59 
60 namespace genesis {
61 namespace utils {
62 
63 // We only include all the class definitions if we actually use zlib.
64 // If not, we later also provide dummy implementations that throw if instanciated.
65 #ifdef GENESIS_ZLIB
66 
67 // ================================================================================================
68 // Gzip Block
69 // ================================================================================================
70 
84 class GzipBlockCompressor
85 {
86 public:
87 
88  // -------------------------------------------------------------
89  // Constructors and Rule of Five
90  // -------------------------------------------------------------
91 
97  GzipBlockCompressor(
98  std::size_t block_size,
99  int compression_level = Z_DEFAULT_COMPRESSION
100  )
101  : in_len_( block_size )
102  , out_len_( 2 * block_size )
103  {
104 
105  // Check compression level validity
106  if(
107  compression_level < static_cast<int>( GzipCompressionLevel::kDefaultCompression ) ||
108  compression_level > static_cast<int>( GzipCompressionLevel::kBestCompression )
109  ) {
110  throw std::invalid_argument(
111  "Compression level " + std::to_string( static_cast<int>( compression_level )) +
112  " is invalid for usage in gzip output stream. Valid range is [ -1, 9 ]."
113  );
114  }
115 
116  // Prepare z_stream object
117  zstream_ = new z_stream;
118  zstream_->next_in = Z_NULL;
119  zstream_->zalloc = Z_NULL;
120  zstream_->zfree = Z_NULL;
121  zstream_->opaque = Z_NULL;
122  int ret = deflateInit2(
123  zstream_, compression_level, Z_DEFLATED, 15+16, 8, Z_DEFAULT_STRATEGY
124  );
125  if( ret != Z_OK ) {
126  throw except::GzipError( zstream_->msg, ret );
127  }
128 
129  // Prepare buffers. We use two times the block size for the output, in the hope that this
130  // always suffices for compressing one block of data. If not, we throw some cryptic message...
131  in_buff_ = new char [in_len_];
132  out_buff_ = new char [out_len_];
133  }
134 
135  GzipBlockCompressor( GzipBlockCompressor const& ) = delete;
136  GzipBlockCompressor( GzipBlockCompressor && ) = default;
137  GzipBlockCompressor& operator = ( GzipBlockCompressor const& ) = delete;
138  GzipBlockCompressor& operator = ( GzipBlockCompressor && ) = default;
139 
143  ~GzipBlockCompressor()
144  {
145  deflateEnd( zstream_ );
146  delete [] in_buff_;
147  delete [] out_buff_;
148  delete zstream_;
149  }
150 
151  // -------------------------------------------------------------
152  // Worker Functions
153  // -------------------------------------------------------------
154 
159  std::pair<char*, size_t> get_input_buffer() const
160  {
161  return { in_buff_, in_len_ };
162  }
163 
167  std::pair<char*, size_t> get_output_buffer() const
168  {
169  return { out_buff_, out_pos_ };
170  }
171 
176  void compress( size_t avail_in )
177  {
178  // Start writing to the beginning of the output buffer. Only set that here, where we begin
179  // a new block compression. This is then updated automatically from within the deflation loop.
180  // We do set it here, so that it is 0 even if we do not compress any data (see next
181  // condition).
182  out_pos_ = 0;
183 
184  // If there is no input, do not write anything, in order to avoid compressing an empty
185  // string by accident, which would result in unneccesary gzip headers without content.
186  if( avail_in == 0 ) {
187  return;
188  }
189 
190  // Check that we are not asked to compress more data than the input buffer can hold.
191  // This is an assertion, because we only use that class and function ourselves locally,
192  // so we know what we are doing. If ever moved to the outside, make this an exception.
193  assert( avail_in <= in_len_ );
194 
195  // Set zstream input buffer pointers. We only process as many bytes as given.
196  // This is because the compress function might be called before the full in_len_ buffer
197  // is filled, so we only compress what we are told to from the outside.
198  zstream_->next_in = reinterpret_cast<decltype( zstream_->next_in )>( in_buff_ );
199  zstream_->avail_in = avail_in;
200 
201  // Loop until all input is processed
202  while( zstream_->avail_in > 0 ) {
203  deflate_loop_( Z_NO_FLUSH );
204  }
205 
206  // All data is done by now.
207  assert( zstream_->avail_in == 0 );
208 
209  // Then, call deflate again asking to finish the zlib stream
210  zstream_->next_in = nullptr;
211  zstream_->avail_in = 0;
212  deflate_loop_( Z_FINISH );
213 
214  // Now reset everything, so that the block can be used again
215  deflateReset( zstream_ );
216  }
217 
218  // -------------------------------------------------------------
219  // Internal Members
220  // -------------------------------------------------------------
221 
222 private:
223 
224  void deflate_loop_( int flush )
225  {
226  while( true ) {
227  // When we get here, out_pos_ is already set from the caller to either be 0 for the
228  // start of the compression, or left at the current output postion from some earlier
229  // deflate loop. So, no need to change it.
230 
231  // Set zstream output buffer. It has twice the size, so should fit, but we later still
232  // check and throw if not. Ugly, but everything else is just too complicated for now.
233  assert( out_len_ >= out_pos_ );
234  zstream_->next_out = reinterpret_cast<decltype( zstream_->next_out )>( out_buff_ + out_pos_ );
235  zstream_->avail_out = out_len_ - out_pos_;
236 
237  // Run the deflate algorithm, and check the result
238  int ret = deflate( zstream_, flush );
239  if( ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR ) {
240  throw except::GzipError( zstream_->msg, ret );
241  }
242 
243  // Store the resulting end position in the output buffer after the deflation.
244  // If this was too much, throw. Also, we check if nothing was written to the buffer;
245  // in that case, we are done.
246  auto const old_pos = out_pos_;
247  out_pos_ = reinterpret_cast<decltype( out_buff_ )>( zstream_->next_out ) - out_buff_;
248  if( out_pos_ >= out_len_ ) {
249  throw except::GzipError( "Block compression ran out of buffer.", Z_MEM_ERROR );
250  }
251 
252  // If we are done with the input, get out of here. The Z_BUF_ERROR error is not fatal,
253  // but indicates that we are done with the input, and can continue.
254  if( ret == Z_STREAM_END || ret == Z_BUF_ERROR || old_pos == out_pos_ ) {
255  break;
256  }
257  }
258  }
259 
260 private:
261 
262  // Compression object
263  z_stream* zstream_;
264 
265  // Store the input, and how many bytes are reserved for it.
266  char* in_buff_;
267  size_t in_len_;
268 
269  // Store the compressed output, how many bytes are reserved,
270  // and how many were used by the compression.
271  char* out_buff_;
272  size_t out_len_;
273  size_t out_pos_ = 0;
274 };
275 
276 // ================================================================================================
277 // Gzip Output Stream Buffer
278 // ================================================================================================
279 
305 class GzipBlockOStreambuf
306  : public std::streambuf
307 {
308 
309  // -------------------------------------------------------------
310  // Structs and Enums
311  // -------------------------------------------------------------
312 
313 private:
314 
326  struct BlockTask
327  {
328  BlockTask( std::size_t block_size, int compression_level )
329  : block( block_size, compression_level )
330  {}
331 
332  GzipBlockCompressor block;
333  std::future<void> future;
334  };
335 
336  // -------------------------------------------------------------
337  // Constructors and Rule of Five
338  // -------------------------------------------------------------
339 
340 public:
341 
357  GzipBlockOStreambuf(
358  std::streambuf* sbuf_p,
359  std::size_t block_size = GzipBlockOStream::GZIP_DEFAULT_BLOCK_SIZE,
360  int compression_level = Z_DEFAULT_COMPRESSION,
361  size_t num_threads = 1,
362  size_t num_blocks = 0
363  )
364  : sbuf_p_( sbuf_p )
365  , thread_pool_( num_threads )
366  {
367  // Basic setup. We take the number of threads as provided, and if given a number of blocks,
368  // also use that. If not, we aim to use twice as many blocks as threads, so that there is
369  // enough buffer keeping all worker threads busy. We want at least 2 blocks, so that we
370  // have one for current writing operations of the stream, and one that can be compressed
371  // at the same time.
372  assert( sbuf_p_ );
373  if( num_threads == 0 ) {
374  throw std::invalid_argument(
375  "Cannot create Gzip Block Output Stream with 0 worker threads."
376  );
377  }
378  if( num_blocks == 0 ) {
379  num_blocks = 2 * num_threads;
380  }
381  if( num_blocks < 2 ) {
382  num_blocks = 2;
383  }
384  assert( num_threads >= 1 );
385  assert( num_blocks >= 2 );
386 
387  // Create as many empty working blocks as needed.
388  block_queue_.reserve( num_blocks );
389  for( size_t i = 0; i < num_blocks; ++i ) {
390  block_queue_.emplace_back( block_size, compression_level );
391  }
392  assert( block_queue_.size() > 0 );
393  assert( block_queue_.size() == num_blocks );
394  assert( current_block_ == 0 );
395 
396  // Use the first worker block as the current stream target buffer.
397  auto block_in = block_queue_[ current_block_ ].block.get_input_buffer();
398  setp( block_in.first, block_in.first + block_in.second );
399  }
400 
401  GzipBlockOStreambuf( GzipBlockOStreambuf const& ) = delete;
402  GzipBlockOStreambuf( GzipBlockOStreambuf &&) = delete;
403  GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf const& ) = delete;
404  GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf &&) = delete;
405 
410  virtual ~GzipBlockOStreambuf()
411  {
412  // Flush the stream
413  //
414  // NOTE: Errors here (sync() return value not 0) are ignored, because we
415  // cannot throw in a destructor. This mirrors the behaviour of
416  // std::basic_filebuf::~basic_filebuf(). To see an exception on error,
417  // close the ofstream with an explicit call to close(), and do not rely
418  // on the implicit call in the destructor.
419  sync();
420  }
421 
422  // -------------------------------------------------------------
423  // Internal and Virtual Functions
424  // -------------------------------------------------------------
425 
426  virtual std::streambuf::int_type overflow(std::streambuf::int_type c = traits_type::eof()) override
427  {
428  // As fas as I understand the usage of the overflow() function, it is only called from the
429  // std::streambuf functions (that we inherit from) when there is no more room in the buffer
430  // to put the next byte to the stream. As we use blocks in the ring buffer as our (ever
431  // changing) output buffer, we should only get here if such a block is fully used.
432  // Assert this. If this assertion fails, our assumption is wrong that the overflow() is
433  // only called from std::streambuf when there is an actual overflow. In that case, we need
434  // to investigate what other std::streambuf functions call overflow, and why.
435  // The assertion checks that the difference between the current write pointer of the stream
436  // buffer and the beginning of the buffer is the same as the total length of the buffer.
437  assert( pptr() >= pbase() );
438  assert(
439  static_cast<size_t>( pptr() - pbase() ) ==
440  block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().second
441  );
442 
443  // Also, assert that the buffer pointers are correct. In particular, the current
444  // write pointer pptr needs to be at the same position as the buffer end pointer epptr.
445  // This is a variation of the check above.
446  // At the same time, the buffer start pointer pbase shoudl still be at the start of the block.
447  assert( pptr() == epptr() );
448  assert(
449  pbase() ==
450  block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().first
451  );
452 
453  // We have an overflow, so the buffer of the current block is full. We can send it to
454  // a worker thread for compression, and move on to the next block in the ring, which we
455  // then use as the new buffer for storing our input data.
456  // If the ring is full, we wait for the next block in order to finish being
457  // compressed, and then write it to the underlying stream.
458  // All of this is done in the function call here.
459  auto ret = compress_current_block_and_move_to_next_block_();
460  if( ret != 0 ) {
461  setp( nullptr, nullptr );
462  return traits_type::eof();
463  }
464 
465  return traits_type::eq_int_type(c, traits_type::eof()) ? traits_type::eof() : sputc(c);
466  }
467 
468  virtual int sync() override
469  {
470  // No pointer to be used. That is an error.
471  if( !pptr() ) {
472  return -1;
473  }
474 
475  // First, send all remaining buffered input of the current block to a compression worker.
476  // Return early if there was an issue writing any previously processed compressed blocks
477  // to the output sink stream.
478  auto ret = compress_current_block_and_move_to_next_block_();
479  if( ret != 0 ) {
480  return ret;
481  }
482 
483  // Then, write all blocks that are still in the queue. We need to do a full round, because
484  // otherwise we have no way of knowing which blocks were used so far - for very short files,
485  // we will not even yet have filled the queue completely.
486  size_t cur = current_block_ % block_queue_.size();
487  do {
488 
489  // Write the compressed block, if it has data, potentially waiting for its compression
490  // to be finished by some worker thread first.
491  if( block_queue_[ cur ].future.valid() ) {
492  ret = write_compressed_block_( cur );
493 
494  // Return early if there was an issue writing to the output sink stream.
495  if( ret != 0 ) {
496  return ret;
497  }
498  }
499 
500  // Process next, wrapping around the ring.
501  ++cur;
502  cur %= block_queue_.size();
503  } while( cur != current_block_ % block_queue_.size() );
504  assert( cur == current_block_ % block_queue_.size() );
505 
506  // Assert that we flushed all blocks, that is, we waited for all their compression to be
507  // done and all their data to be written to our underlying output sink stream.
508  // In that case, none of them should have a valid future, which we check here via a lambda
509  // that we immediately call.
510  assert(
511  [this](){
512  for( auto const& block : block_queue_ ) {
513  if( block.future.valid() ) {
514  return false;
515  }
516  }
517  return true;
518  }()
519  );
520 
521  // Also, the queue of the thread pool must be empty, because we just waited
522  // for all jobs to finish.
523  assert( thread_pool_.load() == 0 );
524 
525  // If we got here, all previous checks of `ret` were okay. So it still should be okay now.
526  assert( ret == 0 );
527  return ret;
528  }
529 
530 private:
531 
551  int compress_current_block_and_move_to_next_block_()
552  {
553  // Get th current block. We were busy filling it with new input data, so it cannot have
554  // been compressed already, meaning it cannot have a valid future.
555  auto& cur_block = block_queue_[ current_block_ % block_queue_.size() ];
556  assert( ! cur_block.future.valid() );
557 
558  // Assert that all pointers are where they should be
559  assert( pbase() == cur_block.block.get_input_buffer().first );
560  assert( epptr() == cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second );
561 
562  // Send block to a compression worker thread, using all bytes that have been written to it.
563  // The thread pool will pick up the task once a thread is available.
564  auto const avail_in = pptr() - pbase();
565  cur_block.future = thread_pool_.enqueue(
566  [&]( size_t av_in ){
567  cur_block.block.compress( av_in );
568  },
569  avail_in
570  );
571 
572  // Move to next block in the ring buffer queue
573  ++current_block_;
574  auto& next_block = block_queue_[ current_block_ % block_queue_.size() ];
575 
576  // If the block has a future, that means that we sent it to compression before.
577  // Because we use a ring buffer, that hence means that the ring is full. There are
578  // currently only full blocks that are either already compressed or under compression
579  // by some worker thread, or waiting to be compressed, but no block that we can use
580  // as our next input buffer for writing data to.
581  // Hence, we have to wait for the block to finish being compressed and then write it to our
582  // underlying sink stream, before we can finally re-use the block as our new target buffer
583  // for the incoming data.
584  int ret = 0;
585  if( next_block.future.valid() ) {
586 
587  // If we are here, the ring buffer queue is full. In that case, all blocks have been
588  // added to the thread pool for being compressed.
589  // Assert that indeed all bocks contain valid futures, that is, they all have been
590  // send to be compressed at some point before. We use a lambda that executes itself.
591  assert(
592  [this](){
593  for( auto const& block : block_queue_ ) {
594  if( ! block.future.valid() ) {
595  return false;
596  }
597  }
598  return true;
599  }()
600  );
601 
602  // Write the compressed block to the underlying stream,
603  // potentially waiting until its compression is finished.
604  ret = write_compressed_block_( current_block_ % block_queue_.size() );
605  }
606 
607  // Now, the block is written, and we can re-use it as the new stream buffer.
608  auto block_in = next_block.block.get_input_buffer();
609  setp( block_in.first, block_in.first + block_in.second );
610 
611  // Assert that all pointers are where they should be
612  assert( pbase() == block_in.first );
613  assert( pptr() == block_in.first );
614  assert( epptr() == block_in.first + block_in.second );
615 
616  // Return value: was the writing of the previously compressed blocks successful.
617  // If not, there was an error somewhere.
618  return ret;
619  }
620 
631  int write_compressed_block_( size_t block_num )
632  {
633  // Get the block to write. It has to have a future, as we only call this function
634  // when the block was previously sent to a worker to be compressed.
635  assert( block_num < block_queue_.size() );
636  auto& block = block_queue_[ block_num ];
637  assert( block.future.valid() );
638 
639  // Make sure that the block compression thread is finished
640  block.future.get();
641 
642  // Get the block output begin and end, and write it to the underlying stream
643  auto const block_out = block.block.get_output_buffer();
644  std::streamsize sz = sbuf_p_->sputn( block_out.first, block_out.second );
645 
646  // Check if there was an error in the sink stream
647  if( sz != static_cast<decltype(sz)>( block_out.second )) {
648  return -1;
649  }
650  return 0;
651  }
652 
653  // -------------------------------------------------------------
654  // Members
655  // -------------------------------------------------------------
656 
657 private:
658 
659  // Target sink stream to write compressed blocks to
660  std::streambuf * sbuf_p_;
661 
662  // Get a pool of workers that will do the compression of each block
663  ThreadPool thread_pool_;
664 
665  // Ring-buffer-like usage of compression blocks: we rotate, and wait if the compression is not
666  // yet done for the next block to be re-used in the ring. The current_block_ number only ever
667  // increases (that is, it counts the total number of blocks that have been processed so far).
668  // This is meant as a helper for future extensions that might want to keep track of byte
669  // offsets of output blocks (not yet implemented).
670  size_t current_block_ = 0;
671  std::vector<BlockTask> block_queue_;
672 
673 };
674 
675 // ================================================================================================
676 // Gzip Output Stream
677 // ================================================================================================
678 
684 {
685  #ifdef GENESIS_OPENMP
686  size_t threads = 0;
687 
688  // Start a parallel region. OpenMP should be configured in a way that nested regions
689  // only get as many threads as there are free ones, so this will not overestimate the number
690  // of available threads.
691  #pragma omp parallel
692  {
693  threads = omp_get_num_threads();
694  }
695  if( threads == 0 ) {
696  threads = 1;
697  }
698  return threads;
699  #else
700  return 1;
701  #endif
702 }
703 
704 // We have all the implementation here, so that we do not need to expose the stream buffer.
705 
707  std::ostream& os,
708  std::size_t block_size,
709  GzipCompressionLevel compression_level,
710  std::size_t num_threads
711 )
712  : GzipBlockOStream( os.rdbuf(), block_size, compression_level, num_threads )
713 {
714  // Nothing to do
715 }
716 
718  std::streambuf* sbuf_p,
719  std::size_t block_size,
720  GzipCompressionLevel compression_level,
721  std::size_t num_threads
722 )
723  : std::ostream( new GzipBlockOStreambuf(
724  sbuf_p,
725  block_size,
726  static_cast<int>(compression_level),
727 
728  // Trickery: if 0 threads are set, we attempt automatic estimation:
729  // use the number of active OpenMP threads in a parallel region
730  num_threads == 0
732  : num_threads
733  ))
734 {
735  exceptions(std::ios_base::badbit);
736 }
737 
739 {
740  delete rdbuf();
741 }
742 
743 // Up until here, we defined all classes needed for gzip block output streaming.
744 // However, if genesis is compiled without zlib support, we instead use dummy implementations
745 // which throw exceptions when being used.
746 #else // GENESIS_ZLIB
747 
748 // ================================================================================================
749 // Gzip Output Stream
750 // ================================================================================================
751 
753  std::ostream&, std::size_t, GzipCompressionLevel, std::size_t
754 ) {
755  throw std::runtime_error( "zlib: Genesis was not compiled with zlib support." );
756 }
757 
759  std::streambuf*, std::size_t, GzipCompressionLevel, std::size_t
760 ) {
761  throw std::runtime_error( "zlib: Genesis was not compiled with zlib support." );
762 }
763 
765 {}
766 
767 #endif // GENESIS_ZLIB
768 
769 } // namespace utils
770 } // namespace genesis
genesis::utils::GzipBlockOStream::~GzipBlockOStream
virtual ~GzipBlockOStream()
Definition: gzip_block_ostream.cpp:738
genesis::utils::GzipBlockOStream::GZIP_DEFAULT_BLOCK_SIZE
static const std::size_t GZIP_DEFAULT_BLOCK_SIZE
Definition: gzip_block_ostream.hpp:111
genesis::utils::GzipCompressionLevel
GzipCompressionLevel
List of possible compression levels used for GzipOStream.
Definition: gzip_stream.hpp:100
genesis::utils::GzipCompressionLevel::kBestCompression
@ kBestCompression
genesis::utils::GzipBlockOStream
Output stream that writes blocks of gzip-compressed data to an underlying wrapped stream,...
Definition: gzip_block_ostream.hpp:90
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
gzip_block_ostream.hpp
genesis::utils::GzipCompressionLevel::kDefaultCompression
@ kDefaultCompression
genesis::population::to_string
std::string to_string(GenomeRegion const &region)
Definition: functions/genome_region.cpp:55
thread_pool.hpp
genesis::utils::gzip_block_get_num_threads_
size_t gzip_block_get_num_threads_()
Local helper function to estimate the number of threads that can be used for gzip block compression.
Definition: gzip_block_ostream.cpp:683
genesis::utils::GzipBlockOStream::GzipBlockOStream
GzipBlockOStream(std::ostream &os, std::size_t block_size=GZIP_DEFAULT_BLOCK_SIZE, GzipCompressionLevel compression_level=GzipCompressionLevel::kDefaultCompression, std::size_t num_threads=0)
Definition: gzip_block_ostream.cpp:706