49 # if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(__CYGWIN__)
54 #endif // GENESIS_ZLIB
80 class GzipBlockCompressor
94 std::size_t block_size,
95 int compression_level = Z_DEFAULT_COMPRESSION
97 : in_len_( block_size )
98 , out_len_( 2 * block_size )
106 throw std::invalid_argument(
107 "Compression level " +
std::to_string(
static_cast<int>( compression_level )) +
108 " is invalid for usage in gzip output stream. Valid range is [ -1, 9 ]."
113 zstream_ =
new z_stream;
114 zstream_->next_in = Z_NULL;
115 zstream_->zalloc = Z_NULL;
116 zstream_->zfree = Z_NULL;
117 zstream_->opaque = Z_NULL;
118 int ret = deflateInit2(
119 zstream_, compression_level, Z_DEFLATED, 15+16, 8, Z_DEFAULT_STRATEGY
122 throw GzipError( zstream_->msg, ret );
127 in_buff_ =
new char [in_len_];
128 out_buff_ =
new char [out_len_];
131 GzipBlockCompressor( GzipBlockCompressor
const& ) =
delete;
132 GzipBlockCompressor( GzipBlockCompressor && ) =
default;
133 GzipBlockCompressor& operator = ( GzipBlockCompressor
const& ) =
delete;
134 GzipBlockCompressor& operator = ( GzipBlockCompressor && ) =
default;
139 ~GzipBlockCompressor()
141 deflateEnd( zstream_ );
155 std::pair<char*, size_t> get_input_buffer()
const
157 return { in_buff_, in_len_ };
163 std::pair<char*, size_t> get_output_buffer()
const
165 return { out_buff_, out_pos_ };
172 void compress(
size_t avail_in )
182 if( avail_in == 0 ) {
189 assert( avail_in <= in_len_ );
194 zstream_->next_in =
reinterpret_cast<decltype( zstream_-
>next_in )>( in_buff_ );
195 zstream_->avail_in = avail_in;
198 while( zstream_->avail_in > 0 ) {
199 deflate_loop_( Z_NO_FLUSH );
203 assert( zstream_->avail_in == 0 );
206 zstream_->next_in =
nullptr;
207 zstream_->avail_in = 0;
208 deflate_loop_( Z_FINISH );
211 deflateReset( zstream_ );
220 void deflate_loop_(
int flush )
229 assert( out_len_ >= out_pos_ );
230 zstream_->next_out =
reinterpret_cast<decltype( zstream_-
>next_out )>( out_buff_ + out_pos_ );
231 zstream_->avail_out = out_len_ - out_pos_;
234 int ret = deflate( zstream_, flush );
235 if( ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR ) {
236 throw GzipError( zstream_->msg, ret );
242 auto const old_pos = out_pos_;
243 out_pos_ =
reinterpret_cast<decltype( out_buff_ )
>( zstream_->next_out ) - out_buff_;
244 if( out_pos_ >= out_len_ ) {
245 throw GzipError(
"Block compression ran out of buffer.", Z_MEM_ERROR );
250 if( ret == Z_STREAM_END || ret == Z_BUF_ERROR || old_pos == out_pos_ ) {
301 class GzipBlockOStreambuf
302 :
public std::streambuf
324 BlockTask( std::size_t block_size,
int compression_level )
325 : block( block_size, compression_level )
328 GzipBlockCompressor block;
329 ProactiveFuture<void> future;
354 std::streambuf* sbuf_p,
356 int compression_level = Z_DEFAULT_COMPRESSION,
357 std::shared_ptr<ThreadPool> thread_pool =
nullptr,
358 size_t num_blocks = 0
361 , thread_pool_( thread_pool ? thread_pool : Options::get().global_thread_pool() )
369 assert( thread_pool_ );
371 if( num_blocks == 0 ) {
372 num_blocks = 2 * thread_pool_->size();
374 if( num_blocks < 2 ) {
377 assert( num_blocks >= 2 );
380 block_queue_.reserve( num_blocks );
381 for(
size_t i = 0; i < num_blocks; ++i ) {
382 block_queue_.emplace_back( block_size, compression_level );
384 assert( block_queue_.size() > 0 );
385 assert( block_queue_.size() == num_blocks );
386 assert( current_block_ == 0 );
389 auto block_in = block_queue_[ current_block_ ].block.get_input_buffer();
390 setp( block_in.first, block_in.first + block_in.second );
393 GzipBlockOStreambuf( GzipBlockOStreambuf
const& ) =
delete;
394 GzipBlockOStreambuf( GzipBlockOStreambuf &&) =
delete;
395 GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf
const& ) =
delete;
396 GzipBlockOStreambuf& operator = ( GzipBlockOStreambuf &&) =
delete;
402 virtual ~GzipBlockOStreambuf()
418 virtual std::streambuf::int_type overflow(std::streambuf::int_type c = traits_type::eof())
override
429 assert( pptr() >= pbase() );
431 static_cast<size_t>( pptr() - pbase() ) ==
432 block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().second
439 assert( pptr() == epptr() );
442 block_queue_[ current_block_ % block_queue_.size() ].block.get_input_buffer().first
451 auto ret = compress_current_block_and_move_to_next_block_();
453 setp(
nullptr,
nullptr );
454 return traits_type::eof();
457 return traits_type::eq_int_type(c, traits_type::eof()) ? traits_type::eof() : sputc(c);
460 virtual int sync()
override
470 auto ret = compress_current_block_and_move_to_next_block_();
478 size_t cur = current_block_ % block_queue_.size();
483 if( block_queue_[ cur ].future.valid() ) {
484 ret = write_compressed_block_( cur );
494 cur %= block_queue_.size();
495 }
while( cur != current_block_ % block_queue_.size() );
496 assert( cur == current_block_ % block_queue_.size() );
504 for(
auto const& block : block_queue_ ) {
505 if( block.future.valid() ) {
539 int compress_current_block_and_move_to_next_block_()
543 auto& cur_block = block_queue_[ current_block_ % block_queue_.size() ];
544 assert( ! cur_block.future.valid() );
547 assert( pbase() == cur_block.block.get_input_buffer().first );
550 cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second
555 assert( thread_pool_ );
556 auto const avail_in = pptr() - pbase();
557 cur_block.future = thread_pool_->enqueue_and_retrieve(
559 cur_block.block.compress( av_in );
566 auto& next_block = block_queue_[ current_block_ % block_queue_.size() ];
577 if( next_block.future.valid() ) {
585 for(
auto const& block : block_queue_ ) {
586 if( ! block.future.valid() ) {
596 ret = write_compressed_block_( current_block_ % block_queue_.size() );
600 auto block_in = next_block.block.get_input_buffer();
601 setp( block_in.first, block_in.first + block_in.second );
604 assert( pbase() == block_in.first );
605 assert( pptr() == block_in.first );
606 assert( epptr() == block_in.first + block_in.second );
623 int write_compressed_block_(
size_t block_num )
627 assert( block_num < block_queue_.size() );
628 auto& block = block_queue_[ block_num ];
629 assert( block.future.valid() );
635 auto const block_out = block.block.get_output_buffer();
636 std::streamsize sz = sbuf_p_->sputn( block_out.first, block_out.second );
639 if( sz !=
static_cast<decltype(sz)
>( block_out.second )) {
652 std::streambuf * sbuf_p_;
655 std::shared_ptr<ThreadPool> thread_pool_;
662 size_t current_block_ = 0;
663 std::vector<BlockTask> block_queue_;
675 std::size_t block_size,
677 std::shared_ptr<ThreadPool> thread_pool
679 :
GzipBlockOStream( os.rdbuf(), block_size, compression_level, thread_pool )
685 std::streambuf* sbuf_p,
686 std::size_t block_size,
688 std::shared_ptr<ThreadPool> thread_pool
690 : std::ostream( new GzipBlockOStreambuf(
693 static_cast<int>(compression_level),
697 exceptions(std::ios_base::badbit);
708 #else // GENESIS_ZLIB
717 throw std::runtime_error(
"zlib: Genesis was not compiled with zlib support." );
723 throw std::runtime_error(
"zlib: Genesis was not compiled with zlib support." );
729 #endif // GENESIS_ZLIB