A library for working with phylogenetic and population genetic data.
v0.32.0
gzip_block_ostream.cpp
Go to the documentation of this file.
1 /*
2  Genesis - A toolkit for working with phylogenetic data.
3  Copyright (C) 2014-2024 Lucas Czech
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  Contact:
19  Lucas Czech <lczech@carnegiescience.edu>
20  Department of Plant Biology, Carnegie Institution For Science
21  260 Panama Street, Stanford, CA 94305, USA
22 */
23 
32 
34 
35 #include <cassert>
36 #include <fstream>
37 #include <future>
38 #include <memory>
39 #include <sstream>
40 #include <stdexcept>
41 #include <string>
42 #include <utility>
43 #include <vector>
44 
45 #ifdef GENESIS_ZLIB
46 
47 # include "zlib.h"
48 
49 # if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(__CYGWIN__)
50 # include <fcntl.h>
51 # include <io.h>
52 # endif
53 
54 #endif // GENESIS_ZLIB
55 
56 namespace genesis {
57 namespace utils {
58 
59 // We only include all the class definitions if we actually use zlib.
60 // If not, we later also provide dummy implementations that throw if instanciated.
61 #ifdef GENESIS_ZLIB
62 
63 // ================================================================================================
64 // Gzip Block
65 // ================================================================================================
66 
80 class GzipBlockCompressor
81 {
82 public:
83 
84  // -------------------------------------------------------------
85  // Constructors and Rule of Five
86  // -------------------------------------------------------------
87 
93  GzipBlockCompressor(
94  std::size_t block_size,
95  int compression_level = Z_DEFAULT_COMPRESSION
96  )
97  : in_len_( block_size )
98  , out_len_( 2 * block_size )
99  {
100 
101  // Check compression level validity
102  if(
103  compression_level < static_cast<int>( GzipCompressionLevel::kDefaultCompression ) ||
104  compression_level > static_cast<int>( GzipCompressionLevel::kBestCompression )
105  ) {
106  throw std::invalid_argument(
107  "Compression level " + std::to_string( static_cast<int>( compression_level )) +
108  " is invalid for usage in gzip output stream. Valid range is [ -1, 9 ]."
109  );
110  }
111 
112  // Prepare z_stream object
113  zstream_ = new z_stream;
114  zstream_->next_in = Z_NULL;
115  zstream_->zalloc = Z_NULL;
116  zstream_->zfree = Z_NULL;
117  zstream_->opaque = Z_NULL;
118  int ret = deflateInit2(
119  zstream_, compression_level, Z_DEFLATED, 15+16, 8, Z_DEFAULT_STRATEGY
120  );
121  if( ret != Z_OK ) {
122  throw GzipError( zstream_->msg, ret );
123  }
124 
125  // Prepare buffers. We use two times the block size for the output, in the hope that this
126  // always suffices for compressing one block of data. If not, we throw some cryptic message...
127  in_buff_ = new char [in_len_];
128  out_buff_ = new char [out_len_];
129  }
130 
131  GzipBlockCompressor( GzipBlockCompressor const& ) = delete;
132  GzipBlockCompressor( GzipBlockCompressor && ) = default;
133  GzipBlockCompressor& operator = ( GzipBlockCompressor const& ) = delete;
134  GzipBlockCompressor& operator = ( GzipBlockCompressor && ) = default;
135 
139  ~GzipBlockCompressor()
140  {
141  deflateEnd( zstream_ );
142  delete [] in_buff_;
143  delete [] out_buff_;
144  delete zstream_;
145  }
146 
147  // -------------------------------------------------------------
148  // Worker Functions
149  // -------------------------------------------------------------
150 
155  std::pair<char*, size_t> get_input_buffer() const
156  {
157  return { in_buff_, in_len_ };
158  }
159 
163  std::pair<char*, size_t> get_output_buffer() const
164  {
165  return { out_buff_, out_pos_ };
166  }
167 
172  void compress( size_t avail_in )
173  {
174  // Start writing to the beginning of the output buffer. Only set that here, where we begin
175  // a new block compression. This is then updated automatically from within the deflation loop.
176  // We do set it here, so that it is 0 even if we do not compress any data (see next
177  // condition).
178  out_pos_ = 0;
179 
180  // If there is no input, do not write anything, in order to avoid compressing an empty
181  // string by accident, which would result in unneccesary gzip headers without content.
182  if( avail_in == 0 ) {
183  return;
184  }
185 
186  // Check that we are not asked to compress more data than the input buffer can hold.
187  // This is an assertion, because we only use that class and function ourselves locally,
188  // so we know what we are doing. If ever moved to the outside, make this an exception.
189  assert( avail_in <= in_len_ );
190 
191  // Set zstream input buffer pointers. We only process as many bytes as given.
192  // This is because the compress function might be called before the full in_len_ buffer
193  // is filled, so we only compress what we are told to from the outside.
194  zstream_->next_in = reinterpret_cast<decltype( zstream_->next_in )>( in_buff_ );
195  zstream_->avail_in = avail_in;
196 
197  // Loop until all input is processed
198  while( zstream_->avail_in > 0 ) {
199  deflate_loop_( Z_NO_FLUSH );
200  }
201 
202  // All data is done by now.
203  assert( zstream_->avail_in == 0 );
204 
205  // Then, call deflate again asking to finish the zlib stream
206  zstream_->next_in = nullptr;
207  zstream_->avail_in = 0;
208  deflate_loop_( Z_FINISH );
209 
210  // Now reset everything, so that the block can be used again
211  deflateReset( zstream_ );
212  }
213 
214  // -------------------------------------------------------------
215  // Internal Members
216  // -------------------------------------------------------------
217 
218 private:
219 
220  void deflate_loop_( int flush )
221  {
222  while( true ) {
223  // When we get here, out_pos_ is already set from the caller to either be 0 for the
224  // start of the compression, or left at the current output postion from some earlier
225  // deflate loop. So, no need to change it.
226 
227  // Set zstream output buffer. It has twice the size, so should fit, but we later still
228  // check and throw if not. Ugly, but everything else is just too complicated for now.
229  assert( out_len_ >= out_pos_ );
230  zstream_->next_out = reinterpret_cast<decltype( zstream_->next_out )>( out_buff_ + out_pos_ );
231  zstream_->avail_out = out_len_ - out_pos_;
232 
233  // Run the deflate algorithm, and check the result
234  int ret = deflate( zstream_, flush );
235  if( ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR ) {
236  throw GzipError( zstream_->msg, ret );
237  }
238 
239  // Store the resulting end position in the output buffer after the deflation.
240  // If this was too much, throw. Also, we check if nothing was written to the buffer;
241  // in that case, we are done.
242  auto const old_pos = out_pos_;
243  out_pos_ = reinterpret_cast<decltype( out_buff_ )>( zstream_->next_out ) - out_buff_;
244  if( out_pos_ >= out_len_ ) {
245  throw GzipError( "Block compression ran out of buffer.", Z_MEM_ERROR );
246  }
247 
248  // If we are done with the input, get out of here. The Z_BUF_ERROR error is not fatal,
249  // but indicates that we are done with the input, and can continue.
250  if( ret == Z_STREAM_END || ret == Z_BUF_ERROR || old_pos == out_pos_ ) {
251  break;
252  }
253  }
254  }
255 
256 private:
257 
258  // Compression object
259  z_stream* zstream_;
260 
261  // Store the input, and how many bytes are reserved for it.
262  char* in_buff_;
263  size_t in_len_;
264 
265  // Store the compressed output, how many bytes are reserved,
266  // and how many were used by the compression.
267  char* out_buff_;
268  size_t out_len_;
269  size_t out_pos_ = 0;
270 };
271 
272 // ================================================================================================
273 // Gzip Output Stream Buffer
274 // ================================================================================================
275 
301 class GzipBlockOStreambuf
302  : public std::streambuf
303 {
304 
305  // -------------------------------------------------------------
306  // Structs and Enums
307  // -------------------------------------------------------------
308 
309 private:
310 
322  struct BlockTask
323  {
324  BlockTask( std::size_t block_size, int compression_level )
325  : block( block_size, compression_level )
326  {}
327 
328  GzipBlockCompressor block;
329  ProactiveFuture<void> future;
330  };
331 
332  // -------------------------------------------------------------
333  // Constructors and Rule of Five
334  // -------------------------------------------------------------
335 
336 public:
337 
353  GzipBlockOStreambuf(
354  std::streambuf* sbuf_p,
355  std::size_t block_size = GzipBlockOStream::GZIP_DEFAULT_BLOCK_SIZE,
356  int compression_level = Z_DEFAULT_COMPRESSION,
357  std::shared_ptr<ThreadPool> thread_pool = nullptr,
358  size_t num_blocks = 0
359  )
360  : sbuf_p_( sbuf_p )
361  , thread_pool_( thread_pool ? thread_pool : Options::get().global_thread_pool() )
362  {
363  // Basic setup. We take the number of threads as provided, and if given a number of blocks,
364  // also use that. If not, we aim to use twice as many blocks as threads, so that there is
365  // enough buffer keeping all worker threads busy. We want at least 2 blocks, so that we
366  // have one for current writing operations of the stream, and one that can be compressed
367  // at the same time.
368  assert( sbuf_p_ );
369  assert( thread_pool_ );
370 
371  if( num_blocks == 0 ) {
372  num_blocks = 2 * thread_pool_->size();
373  }
374  if( num_blocks < 2 ) {
375  num_blocks = 2;
376  }
377  assert( num_blocks >= 2 );
378 
379  // Create as many empty working blocks as needed.
380  block_queue_.reserve( num_blocks );
381  for( size_t i = 0; i < num_blocks; ++i ) {
382  block_queue_.emplace_back( block_size, compression_level );
383  }
384  assert( block_queue_.size() > 0 );
385  assert( block_queue_.size() == num_blocks );
386  assert( current_block_ == 0 );
387 
388  // Use the first worker block as the current stream target buffer.
389  auto block_in = block_queue_[ current_block_ ].block.get_input_buffer();
390  setp( block_in.first, block_in.first + block_in.second );
391  }
392 
393  GzipBlockOStreambuf( GzipBlockOStreambuf const& ) = delete;
394  GzipBlockOStreambuf( GzipBlockOStreambuf &&) = delete;
395  GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf const& ) = delete;
396  GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf &&) = delete;
397 
402  virtual ~GzipBlockOStreambuf()
403  {
404  // Flush the stream
405  //
406  // NOTE: Errors here (sync() return value not 0) are ignored, because we
407  // cannot throw in a destructor. This mirrors the behaviour of
408  // std::basic_filebuf::~basic_filebuf(). To see an exception on error,
409  // close the ofstream with an explicit call to close(), and do not rely
410  // on the implicit call in the destructor.
411  sync();
412  }
413 
414  // -------------------------------------------------------------
415  // Internal and Virtual Functions
416  // -------------------------------------------------------------
417 
418  virtual std::streambuf::int_type overflow(std::streambuf::int_type c = traits_type::eof()) override
419  {
420  // As fas as I understand the usage of the overflow() function, it is only called from the
421  // std::streambuf functions (that we inherit from) when there is no more room in the buffer
422  // to put the next byte to the stream. As we use blocks in the ring buffer as our (ever
423  // changing) output buffer, we should only get here if such a block is fully used.
424  // Assert this. If this assertion fails, our assumption is wrong that the overflow() is
425  // only called from std::streambuf when there is an actual overflow. In that case, we need
426  // to investigate what other std::streambuf functions call overflow, and why.
427  // The assertion checks that the difference between the current write pointer of the stream
428  // buffer and the beginning of the buffer is the same as the total length of the buffer.
429  assert( pptr() >= pbase() );
430  assert(
431  static_cast<size_t>( pptr() - pbase() ) ==
432  block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().second
433  );
434 
435  // Also, assert that the buffer pointers are correct. In particular, the current
436  // write pointer pptr needs to be at the same position as the buffer end pointer epptr.
437  // This is a variation of the check above.
438  // At the same time, the buffer start pointer pbase shoudl still be at the start of the block.
439  assert( pptr() == epptr() );
440  assert(
441  pbase() ==
442  block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().first
443  );
444 
445  // We have an overflow, so the buffer of the current block is full. We can send it to
446  // a worker thread for compression, and move on to the next block in the ring, which we
447  // then use as the new buffer for storing our input data.
448  // If the ring is full, we wait for the next block in order to finish being
449  // compressed, and then write it to the underlying stream.
450  // All of this is done in the function call here.
451  auto ret = compress_current_block_and_move_to_next_block_();
452  if( ret != 0 ) {
453  setp( nullptr, nullptr );
454  return traits_type::eof();
455  }
456 
457  return traits_type::eq_int_type(c, traits_type::eof()) ? traits_type::eof() : sputc(c);
458  }
459 
460  virtual int sync() override
461  {
462  // No pointer to be used. That is an error.
463  if( !pptr() ) {
464  return -1;
465  }
466 
467  // First, send all remaining buffered input of the current block to a compression worker.
468  // Return early if there was an issue writing any previously processed compressed blocks
469  // to the output sink stream.
470  auto ret = compress_current_block_and_move_to_next_block_();
471  if( ret != 0 ) {
472  return ret;
473  }
474 
475  // Then, write all blocks that are still in the queue. We need to do a full round, because
476  // otherwise we have no way of knowing which blocks were used so far - for very short files,
477  // we will not even yet have filled the queue completely.
478  size_t cur = current_block_ % block_queue_.size();
479  do {
480 
481  // Write the compressed block, if it has data, potentially waiting for its compression
482  // to be finished by some worker thread first.
483  if( block_queue_[ cur ].future.valid() ) {
484  ret = write_compressed_block_( cur );
485 
486  // Return early if there was an issue writing to the output sink stream.
487  if( ret != 0 ) {
488  return ret;
489  }
490  }
491 
492  // Process next, wrapping around the ring.
493  ++cur;
494  cur %= block_queue_.size();
495  } while( cur != current_block_ % block_queue_.size() );
496  assert( cur == current_block_ % block_queue_.size() );
497 
498  // Assert that we flushed all blocks, that is, we waited for all their compression to be
499  // done and all their data to be written to our underlying output sink stream.
500  // In that case, none of them should have a valid future, which we check here via a lambda
501  // that we immediately call.
502  assert(
503  [this](){
504  for( auto const& block : block_queue_ ) {
505  if( block.future.valid() ) {
506  return false;
507  }
508  }
509  return true;
510  }()
511  );
512 
513  // If we got here, all previous checks of `ret` were okay. So it still should be okay now.
514  assert( ret == 0 );
515  return ret;
516  }
517 
518 private:
519 
539  int compress_current_block_and_move_to_next_block_()
540  {
541  // Get th current block. We were busy filling it with new input data, so it cannot have
542  // been compressed already, meaning it cannot have a valid future.
543  auto& cur_block = block_queue_[ current_block_ % block_queue_.size() ];
544  assert( ! cur_block.future.valid() );
545 
546  // Assert that all pointers are where they should be
547  assert( pbase() == cur_block.block.get_input_buffer().first );
548  assert(
549  epptr() ==
550  cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second
551  );
552 
553  // Send block to a compression worker thread, using all bytes that have been written to it.
554  // The thread pool will pick up the task once a thread is available.
555  assert( thread_pool_ );
556  auto const avail_in = pptr() - pbase();
557  cur_block.future = thread_pool_->enqueue_and_retrieve(
558  [&]( size_t av_in ){
559  cur_block.block.compress( av_in );
560  },
561  avail_in
562  );
563 
564  // Move to next block in the ring buffer queue
565  ++current_block_;
566  auto& next_block = block_queue_[ current_block_ % block_queue_.size() ];
567 
568  // If the block has a future, that means that we sent it to compression before.
569  // Because we use a ring buffer, that hence means that the ring is full. There are
570  // currently only full blocks that are either already compressed or under compression
571  // by some worker thread, or waiting to be compressed, but no block that we can use
572  // as our next input buffer for writing data to.
573  // Hence, we have to wait for the block to finish being compressed and then write it to our
574  // underlying sink stream, before we can finally re-use the block as our new target buffer
575  // for the incoming data.
576  int ret = 0;
577  if( next_block.future.valid() ) {
578 
579  // If we are here, the ring buffer queue is full. In that case, all blocks have been
580  // added to the thread pool for being compressed.
581  // Assert that indeed all bocks contain valid futures, that is, they all have been
582  // send to be compressed at some point before. We use a lambda that executes itself.
583  assert(
584  [this](){
585  for( auto const& block : block_queue_ ) {
586  if( ! block.future.valid() ) {
587  return false;
588  }
589  }
590  return true;
591  }()
592  );
593 
594  // Write the compressed block to the underlying stream,
595  // potentially waiting until its compression is finished.
596  ret = write_compressed_block_( current_block_ % block_queue_.size() );
597  }
598 
599  // Now, the block is written, and we can re-use it as the new stream buffer.
600  auto block_in = next_block.block.get_input_buffer();
601  setp( block_in.first, block_in.first + block_in.second );
602 
603  // Assert that all pointers are where they should be
604  assert( pbase() == block_in.first );
605  assert( pptr() == block_in.first );
606  assert( epptr() == block_in.first + block_in.second );
607 
608  // Return value: was the writing of the previously compressed blocks successful.
609  // If not, there was an error somewhere.
610  return ret;
611  }
612 
623  int write_compressed_block_( size_t block_num )
624  {
625  // Get the block to write. It has to have a future, as we only call this function
626  // when the block was previously sent to a worker to be compressed.
627  assert( block_num < block_queue_.size() );
628  auto& block = block_queue_[ block_num ];
629  assert( block.future.valid() );
630 
631  // Make sure that the block compression thread is finished
632  block.future.get();
633 
634  // Get the block output begin and end, and write it to the underlying stream
635  auto const block_out = block.block.get_output_buffer();
636  std::streamsize sz = sbuf_p_->sputn( block_out.first, block_out.second );
637 
638  // Check if there was an error in the sink stream
639  if( sz != static_cast<decltype(sz)>( block_out.second )) {
640  return -1;
641  }
642  return 0;
643  }
644 
645  // -------------------------------------------------------------
646  // Members
647  // -------------------------------------------------------------
648 
649 private:
650 
651  // Target sink stream to write compressed blocks to
652  std::streambuf * sbuf_p_;
653 
654  // Get a pool of workers that will do the compression of each block
655  std::shared_ptr<ThreadPool> thread_pool_;
656 
657  // Ring-buffer-like usage of compression blocks: we rotate, and wait if the compression is not
658  // yet done for the next block to be re-used in the ring. The current_block_ number only ever
659  // increases (that is, it counts the total number of blocks that have been processed so far).
660  // This is meant as a helper for future extensions that might want to keep track of byte
661  // offsets of output blocks (not yet implemented).
662  size_t current_block_ = 0;
663  std::vector<BlockTask> block_queue_;
664 
665 };
666 
667 // ================================================================================================
668 // Gzip Output Stream
669 // ================================================================================================
670 
671 // We have all the implementation here, so that we do not need to expose the stream buffer.
672 
674  std::ostream& os,
675  std::size_t block_size,
676  GzipCompressionLevel compression_level,
677  std::shared_ptr<ThreadPool> thread_pool
678 )
679  : GzipBlockOStream( os.rdbuf(), block_size, compression_level, thread_pool )
680 {
681  // Nothing to do
682 }
683 
685  std::streambuf* sbuf_p,
686  std::size_t block_size,
687  GzipCompressionLevel compression_level,
688  std::shared_ptr<ThreadPool> thread_pool
689 )
690  : std::ostream( new GzipBlockOStreambuf(
691  sbuf_p,
692  block_size,
693  static_cast<int>(compression_level),
694  thread_pool
695  ))
696 {
697  exceptions(std::ios_base::badbit);
698 }
699 
701 {
702  delete rdbuf();
703 }
704 
705 // Up until here, we defined all classes needed for gzip block output streaming.
706 // However, if genesis is compiled without zlib support, we instead use dummy implementations
707 // which throw exceptions when being used.
708 #else // GENESIS_ZLIB
709 
710 // ================================================================================================
711 // Gzip Output Stream
712 // ================================================================================================
713 
715  std::ostream&, std::size_t, GzipCompressionLevel, std::size_t
716 ) {
717  throw std::runtime_error( "zlib: Genesis was not compiled with zlib support." );
718 }
719 
721  std::streambuf*, std::size_t, GzipCompressionLevel, std::size_t
722 ) {
723  throw std::runtime_error( "zlib: Genesis was not compiled with zlib support." );
724 }
725 
727 {}
728 
729 #endif // GENESIS_ZLIB
730 
731 } // namespace utils
732 } // namespace genesis
genesis::utils::GzipBlockOStream::~GzipBlockOStream
virtual ~GzipBlockOStream()
Definition: gzip_block_ostream.cpp:700
genesis::utils::GzipBlockOStream::GZIP_DEFAULT_BLOCK_SIZE
static const std::size_t GZIP_DEFAULT_BLOCK_SIZE
Definition: gzip_block_ostream.hpp:110
genesis::utils::GzipCompressionLevel
GzipCompressionLevel
List of possible compression levels used for GzipOStream.
Definition: gzip_stream.hpp:100
genesis::utils::GzipCompressionLevel::kBestCompression
@ kBestCompression
genesis::population::to_string
std::string to_string(GenomeLocus const &locus)
Definition: function/genome_locus.hpp:52
genesis::utils::GzipBlockOStream
Output stream that writes blocks of gzip-compressed data to an underlying wrapped stream,...
Definition: gzip_block_ostream.hpp:89
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
gzip_block_ostream.hpp
genesis::utils::GzipCompressionLevel::kDefaultCompression
@ kDefaultCompression
options.hpp
genesis::utils::GzipBlockOStream::GzipBlockOStream
GzipBlockOStream(std::ostream &os, std::size_t block_size=GZIP_DEFAULT_BLOCK_SIZE, GzipCompressionLevel compression_level=GzipCompressionLevel::kDefaultCompression, std::shared_ptr< ThreadPool > thread_pool=nullptr)
Definition: gzip_block_ostream.cpp:673