49 # if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(__CYGWIN__)
54 #endif // GENESIS_ZLIB
58 #endif // GENESIS_OPENMP
84 class GzipBlockCompressor
98 std::size_t block_size,
99 int compression_level = Z_DEFAULT_COMPRESSION
101 : in_len_( block_size )
102 , out_len_( 2 * block_size )
110 throw std::invalid_argument(
111 "Compression level " +
std::to_string(
static_cast<int>( compression_level )) +
112 " is invalid for usage in gzip output stream. Valid range is [ -1, 9 ]."
117 zstream_ =
new z_stream;
118 zstream_->next_in = Z_NULL;
119 zstream_->zalloc = Z_NULL;
120 zstream_->zfree = Z_NULL;
121 zstream_->opaque = Z_NULL;
122 int ret = deflateInit2(
123 zstream_, compression_level, Z_DEFLATED, 15+16, 8, Z_DEFAULT_STRATEGY
126 throw except::GzipError( zstream_->msg, ret );
131 in_buff_ =
new char [in_len_];
132 out_buff_ =
new char [out_len_];
135 GzipBlockCompressor( GzipBlockCompressor
const& ) =
delete;
136 GzipBlockCompressor( GzipBlockCompressor && ) =
default;
137 GzipBlockCompressor& operator = ( GzipBlockCompressor
const& ) =
delete;
138 GzipBlockCompressor& operator = ( GzipBlockCompressor && ) =
default;
143 ~GzipBlockCompressor()
145 deflateEnd( zstream_ );
159 std::pair<char*, size_t> get_input_buffer()
const
161 return { in_buff_, in_len_ };
167 std::pair<char*, size_t> get_output_buffer()
const
169 return { out_buff_, out_pos_ };
176 void compress(
size_t avail_in )
186 if( avail_in == 0 ) {
193 assert( avail_in <= in_len_ );
198 zstream_->next_in =
reinterpret_cast<decltype( zstream_-
>next_in )>( in_buff_ );
199 zstream_->avail_in = avail_in;
202 while( zstream_->avail_in > 0 ) {
203 deflate_loop_( Z_NO_FLUSH );
207 assert( zstream_->avail_in == 0 );
210 zstream_->next_in =
nullptr;
211 zstream_->avail_in = 0;
212 deflate_loop_( Z_FINISH );
215 deflateReset( zstream_ );
224 void deflate_loop_(
int flush )
233 assert( out_len_ >= out_pos_ );
234 zstream_->next_out =
reinterpret_cast<decltype( zstream_-
>next_out )>( out_buff_ + out_pos_ );
235 zstream_->avail_out = out_len_ - out_pos_;
238 int ret = deflate( zstream_, flush );
239 if( ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR ) {
240 throw except::GzipError( zstream_->msg, ret );
246 auto const old_pos = out_pos_;
247 out_pos_ =
reinterpret_cast<decltype( out_buff_ )
>( zstream_->next_out ) - out_buff_;
248 if( out_pos_ >= out_len_ ) {
249 throw except::GzipError(
"Block compression ran out of buffer.", Z_MEM_ERROR );
254 if( ret == Z_STREAM_END || ret == Z_BUF_ERROR || old_pos == out_pos_ ) {
305 class GzipBlockOStreambuf
306 :
public std::streambuf
328 BlockTask( std::size_t block_size,
int compression_level )
329 : block( block_size, compression_level )
332 GzipBlockCompressor block;
333 std::future<void> future;
358 std::streambuf* sbuf_p,
360 int compression_level = Z_DEFAULT_COMPRESSION,
361 size_t num_threads = 1,
362 size_t num_blocks = 0
365 , thread_pool_( num_threads )
373 if( num_threads == 0 ) {
374 throw std::invalid_argument(
375 "Cannot create Gzip Block Output Stream with 0 worker threads."
378 if( num_blocks == 0 ) {
379 num_blocks = 2 * num_threads;
381 if( num_blocks < 2 ) {
384 assert( num_threads >= 1 );
385 assert( num_blocks >= 2 );
388 block_queue_.reserve( num_blocks );
389 for(
size_t i = 0; i < num_blocks; ++i ) {
390 block_queue_.emplace_back( block_size, compression_level );
392 assert( block_queue_.size() > 0 );
393 assert( block_queue_.size() == num_blocks );
394 assert( current_block_ == 0 );
397 auto block_in = block_queue_[ current_block_ ].block.get_input_buffer();
398 setp( block_in.first, block_in.first + block_in.second );
401 GzipBlockOStreambuf( GzipBlockOStreambuf
const& ) =
delete;
402 GzipBlockOStreambuf( GzipBlockOStreambuf &&) =
delete;
403 GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf
const& ) =
delete;
404 GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf &&) =
delete;
410 virtual ~GzipBlockOStreambuf()
426 virtual std::streambuf::int_type overflow(std::streambuf::int_type c = traits_type::eof())
override
437 assert( pptr() >= pbase() );
439 static_cast<size_t>( pptr() - pbase() ) ==
440 block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().second
447 assert( pptr() == epptr() );
450 block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().first
459 auto ret = compress_current_block_and_move_to_next_block_();
461 setp(
nullptr,
nullptr );
462 return traits_type::eof();
465 return traits_type::eq_int_type(c, traits_type::eof()) ? traits_type::eof() : sputc(c);
468 virtual int sync()
override
478 auto ret = compress_current_block_and_move_to_next_block_();
486 size_t cur = current_block_ % block_queue_.size();
491 if( block_queue_[ cur ].future.valid() ) {
492 ret = write_compressed_block_( cur );
502 cur %= block_queue_.size();
503 }
while( cur != current_block_ % block_queue_.size() );
504 assert( cur == current_block_ % block_queue_.size() );
512 for(
auto const& block : block_queue_ ) {
513 if( block.future.valid() ) {
523 assert( thread_pool_.load() == 0 );
551 int compress_current_block_and_move_to_next_block_()
555 auto& cur_block = block_queue_[ current_block_ % block_queue_.size() ];
556 assert( ! cur_block.future.valid() );
559 assert( pbase() == cur_block.block.get_input_buffer().first );
560 assert( epptr() == cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second );
564 auto const avail_in = pptr() - pbase();
565 cur_block.future = thread_pool_.enqueue(
567 cur_block.block.compress( av_in );
574 auto& next_block = block_queue_[ current_block_ % block_queue_.size() ];
585 if( next_block.future.valid() ) {
593 for(
auto const& block : block_queue_ ) {
594 if( ! block.future.valid() ) {
604 ret = write_compressed_block_( current_block_ % block_queue_.size() );
608 auto block_in = next_block.block.get_input_buffer();
609 setp( block_in.first, block_in.first + block_in.second );
612 assert( pbase() == block_in.first );
613 assert( pptr() == block_in.first );
614 assert( epptr() == block_in.first + block_in.second );
631 int write_compressed_block_(
size_t block_num )
635 assert( block_num < block_queue_.size() );
636 auto& block = block_queue_[ block_num ];
637 assert( block.future.valid() );
643 auto const block_out = block.block.get_output_buffer();
644 std::streamsize sz = sbuf_p_->sputn( block_out.first, block_out.second );
647 if( sz !=
static_cast<decltype(sz)
>( block_out.second )) {
660 std::streambuf * sbuf_p_;
663 ThreadPool thread_pool_;
670 size_t current_block_ = 0;
671 std::vector<BlockTask> block_queue_;
685 #ifdef GENESIS_OPENMP
693 threads = omp_get_num_threads();
708 std::size_t block_size,
710 std::size_t num_threads
712 :
GzipBlockOStream( os.rdbuf(), block_size, compression_level, num_threads )
718 std::streambuf* sbuf_p,
719 std::size_t block_size,
721 std::size_t num_threads
723 : std::ostream( new GzipBlockOStreambuf(
726 static_cast<int>(compression_level),
735 exceptions(std::ios_base::badbit);
746 #else // GENESIS_ZLIB
755 throw std::runtime_error(
"zlib: Genesis was not compiled with zlib support." );
761 throw std::runtime_error(
"zlib: Genesis was not compiled with zlib support." );
767 #endif // GENESIS_ZLIB