From 7f3bf219920bfb12c6e1c0b744be13d719672158 Mon Sep 17 00:00:00 2001 From: Martin Radev Date: Thu, 15 Aug 2019 01:18:47 +0200 Subject: [PATCH 1/5] ARROW-6216: [C++] Expose codec compression level to user In some situations the user might benefit by using a compression level other than the default one in Arrow. This patch adds a method to the ColumnProperties builder to allow the user to select a compression level. --- cpp/src/arrow/util/compression.cc | 41 ++++++++++++-- cpp/src/arrow/util/compression.h | 5 ++ cpp/src/arrow/util/compression_brotli.cc | 21 +++++-- cpp/src/arrow/util/compression_brotli.h | 5 ++ cpp/src/arrow/util/compression_bz2.cc | 18 ++++-- cpp/src/arrow/util/compression_bz2.h | 5 ++ cpp/src/arrow/util/compression_snappy.cc | 1 - cpp/src/arrow/util/compression_test.cc | 60 ++++++++++++++++---- cpp/src/arrow/util/compression_zlib.cc | 41 +++++++++----- cpp/src/arrow/util/compression_zlib.h | 1 + cpp/src/arrow/util/compression_zstd.cc | 29 +++++++--- cpp/src/arrow/util/compression_zstd.h | 7 +++ cpp/src/parquet/column_io_benchmark.cc | 3 +- cpp/src/parquet/column_writer.cc | 19 ++++--- cpp/src/parquet/column_writer.h | 2 +- cpp/src/parquet/column_writer_test.cc | 42 ++++++++++---- cpp/src/parquet/file_writer.cc | 17 +++--- cpp/src/parquet/properties.h | 71 +++++++++++++++++++++++- cpp/src/parquet/types.cc | 23 ++++++-- cpp/src/parquet/types.h | 4 ++ 20 files changed, 328 insertions(+), 87 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 4924c14c599..9e58180a2a2 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -17,6 +17,7 @@ #include "arrow/util/compression.h" +#include #include #ifdef ARROW_WITH_BROTLI @@ -48,6 +49,8 @@ namespace arrow { namespace util { +int GetHintValueForDefaultCompressionLevel() { return std::numeric_limits::min(); } + Compressor::~Compressor() {} Decompressor::~Decompressor() {} @@ -55,19 +58,32 @@ Decompressor::~Decompressor() {} Codec::~Codec() {} Status Codec::Create(Compression::type codec_type, std::unique_ptr* result) { + const int compression_level = GetHintValueForDefaultCompressionLevel(); + return Codec::Create(codec_type, compression_level, result); +} + +Status Codec::Create(Compression::type codec_type, int compression_level, + std::unique_ptr* result) { + const bool use_default_compression_level = + (compression_level == GetHintValueForDefaultCompressionLevel()); + Codec* codec = nullptr; switch (codec_type) { case Compression::UNCOMPRESSED: break; case Compression::SNAPPY: #ifdef ARROW_WITH_SNAPPY - result->reset(new SnappyCodec()); + codec = new SnappyCodec(); break; #else return Status::NotImplemented("Snappy codec support not built"); #endif case Compression::GZIP: #ifdef ARROW_WITH_ZLIB - result->reset(new GZipCodec()); + if (use_default_compression_level) { + codec = new GZipCodec(); + } else { + codec = new GZipCodec(compression_level); + } break; #else return Status::NotImplemented("Gzip codec support not built"); @@ -76,28 +92,40 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr* resul return Status::NotImplemented("LZO codec not implemented"); case Compression::BROTLI: #ifdef ARROW_WITH_BROTLI - result->reset(new BrotliCodec()); + if (use_default_compression_level) { + codec = new BrotliCodec(); + } else { + codec = new BrotliCodec(compression_level); + } break; #else return Status::NotImplemented("Brotli codec support not built"); #endif case Compression::LZ4: #ifdef ARROW_WITH_LZ4 - result->reset(new Lz4Codec()); + codec = new Lz4Codec(); break; #else return Status::NotImplemented("LZ4 codec support not built"); #endif case Compression::ZSTD: #ifdef ARROW_WITH_ZSTD - result->reset(new ZSTDCodec()); + if (use_default_compression_level) { + codec = new ZSTDCodec(); + } else { + codec = new ZSTDCodec(compression_level); + } break; #else return Status::NotImplemented("ZSTD codec support not built"); #endif case Compression::BZ2: #ifdef ARROW_WITH_BZ2 - result->reset(new BZ2Codec()); + if (use_default_compression_level) { + codec = new BZ2Codec(); + } else { + codec = new BZ2Codec(compression_level); + } break; #else return Status::NotImplemented("BZ2 codec support not built"); @@ -105,6 +133,7 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr* resul default: return Status::Invalid("Unrecognized codec"); } + result->reset(codec); return Status::OK(); } diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 8665374bed5..7d164f8a5a8 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -33,6 +33,9 @@ struct Compression { namespace util { +ARROW_EXPORT +int GetHintValueForDefaultCompressionLevel(); + /// \brief Streaming compressor interface /// class ARROW_EXPORT Compressor { @@ -98,6 +101,8 @@ class ARROW_EXPORT Codec { virtual ~Codec(); static Status Create(Compression::type codec, std::unique_ptr* out); + static Status Create(Compression::type codec, int compression_level, + std::unique_ptr* out); /// \brief One-shot decompression function /// diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 5f47db014df..92a051e5009 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -31,10 +31,14 @@ namespace arrow { namespace util { +namespace { + // Brotli compression quality is max (11) by default, which is slow. // We use 8 as a default as it is the best trade-off for Parquet workload. constexpr int kBrotliDefaultCompressionLevel = 8; +} // namespace + // ---------------------------------------------------------------------- // Brotli decompressor implementation @@ -98,7 +102,8 @@ class BrotliDecompressor : public Decompressor { class BrotliCompressor : public Compressor { public: - BrotliCompressor() {} + explicit BrotliCompressor(int compression_level) + : compression_level_(compression_level) {} ~BrotliCompressor() override { if (state_ != nullptr) { @@ -111,8 +116,7 @@ class BrotliCompressor : public Compressor { if (state_ == nullptr) { return BrotliError("Brotli init failed"); } - if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, - kBrotliDefaultCompressionLevel)) { + if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) { return BrotliError("Brotli set compression level failed"); } return Status::OK(); @@ -131,6 +135,9 @@ class BrotliCompressor : public Compressor { Status BrotliError(const char* msg) { return Status::IOError(msg); } BrotliEncoderState* state_ = nullptr; + + private: + int compression_level_; }; Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -188,8 +195,12 @@ Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes // ---------------------------------------------------------------------- // Brotli codec implementation +BrotliCodec::BrotliCodec(int compression_level) : compression_level_(compression_level) {} + +BrotliCodec::BrotliCodec() : BrotliCodec(kBrotliDefaultCompressionLevel) {} + Status BrotliCodec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); @@ -235,7 +246,7 @@ Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input, DCHECK_GE(input_len, 0); DCHECK_GE(output_buffer_len, 0); std::size_t output_size = static_cast(output_buffer_len); - if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel, BROTLI_DEFAULT_WINDOW, + if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, static_cast(input_len), input, &output_size, output_buffer) == BROTLI_FALSE) { return Status::IOError("Brotli compression failure."); diff --git a/cpp/src/arrow/util/compression_brotli.h b/cpp/src/arrow/util/compression_brotli.h index 59f97cda6b9..c01b9560987 100644 --- a/cpp/src/arrow/util/compression_brotli.h +++ b/cpp/src/arrow/util/compression_brotli.h @@ -31,6 +31,8 @@ namespace util { // Brotli codec. class ARROW_EXPORT BrotliCodec : public Codec { public: + explicit BrotliCodec(int compression_level); + BrotliCodec(); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +49,9 @@ class ARROW_EXPORT BrotliCodec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "brotli"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 63f0308ca47..03bcbc6b3d1 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -37,12 +37,16 @@ namespace arrow { namespace util { +namespace { + constexpr int kBZ2DefaultCompressionLevel = 9; // Max number of bytes the bz2 APIs accept at a time -static constexpr auto kSizeLimit = +constexpr auto kSizeLimit = static_cast(std::numeric_limits::max()); +} // namespace + Status BZ2Error(const char* prefix_msg, int bz_result) { ARROW_CHECK(bz_result != BZ_OK && bz_result != BZ_RUN_OK && bz_result != BZ_FLUSH_OK && bz_result != BZ_FINISH_OK && bz_result != BZ_STREAM_END); @@ -150,7 +154,8 @@ class BZ2Decompressor : public Decompressor { class BZ2Compressor : public Compressor { public: - BZ2Compressor() : initialized_(false) {} + explicit BZ2Compressor(int compression_level) + : initialized_(false), compression_level_(compression_level) {} ~BZ2Compressor() override { if (initialized_) { @@ -162,7 +167,7 @@ class BZ2Compressor : public Compressor { DCHECK(!initialized_); memset(&stream_, 0, sizeof(stream_)); int ret; - ret = BZ2_bzCompressInit(&stream_, kBZ2DefaultCompressionLevel, 0, 0); + ret = BZ2_bzCompressInit(&stream_, compression_level_, 0, 0); if (ret != BZ_OK) { return BZ2Error("bz2 compressor init failed: ", ret); } @@ -227,13 +232,18 @@ class BZ2Compressor : public Compressor { protected: bz_stream stream_; bool initialized_; + int compression_level_; }; // ---------------------------------------------------------------------- // bz2 codec implementation +BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) {} + +BZ2Codec::BZ2Codec() : BZ2Codec(kBZ2DefaultCompressionLevel) {} + Status BZ2Codec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); diff --git a/cpp/src/arrow/util/compression_bz2.h b/cpp/src/arrow/util/compression_bz2.h index 21461588255..eac044e49b5 100644 --- a/cpp/src/arrow/util/compression_bz2.h +++ b/cpp/src/arrow/util/compression_bz2.h @@ -31,6 +31,8 @@ namespace util { // BZ2 codec. class ARROW_EXPORT BZ2Codec : public Codec { public: + explicit BZ2Codec(int compression_level); + BZ2Codec(); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +49,9 @@ class ARROW_EXPORT BZ2Codec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "bz2"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 963de698cb5..ae21f27b41a 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -87,6 +87,5 @@ Status SnappyCodec::Compress(int64_t input_len, const uint8_t* input, *output_len = static_cast(output_size); return Status::OK(); } - } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 20916d6b2e9..401a7ab804f 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -53,14 +53,8 @@ std::vector MakeCompressibleData(int data_size) { } // Check roundtrip of one-shot compression and decompression functions. - -void CheckCodecRoundtrip(Compression::type ctype, const std::vector& data) { - // create multiple compressors to try to break them - std::unique_ptr c1, c2; - - ASSERT_OK(Codec::Create(ctype, &c1)); - ASSERT_OK(Codec::Create(ctype, &c2)); - +void CheckCodecRoundtrip(std::unique_ptr& c1, std::unique_ptr& c2, + const std::vector& data) { int max_compressed_len = static_cast(c1->MaxCompressedLen(data.size(), data.data())); std::vector compressed(max_compressed_len); @@ -341,19 +335,63 @@ class CodecTest : public ::testing::TestWithParam { }; TEST_P(CodecTest, CodecRoundtrip) { - if (GetCompression() == Compression::BZ2) { + const auto compression = GetCompression(); + if (compression == Compression::BZ2) { // SKIP: BZ2 doesn't support one-shot compression return; } int sizes[] = {0, 10000, 100000}; + + // create multiple compressors to try to break them + std::unique_ptr c1, c2; + ASSERT_OK(Codec::Create(compression, &c1)); + ASSERT_OK(Codec::Create(compression, &c2)); + for (int data_size : sizes) { std::vector data = MakeRandomData(data_size); - CheckCodecRoundtrip(GetCompression(), data); + CheckCodecRoundtrip(c1, c2, data); data = MakeCompressibleData(data_size); - CheckCodecRoundtrip(GetCompression(), data); + CheckCodecRoundtrip(c1, c2, data); + } +} + +TEST_P(CodecTest, SpecifyCompressionLevel) { + const auto compression = GetCompression(); + // The compression level is codec specific. + int compression_level; + switch (compression) { + case Compression::LZ4: + case Compression::LZO: + case Compression::UNCOMPRESSED: + case Compression::SNAPPY: + // Compression level cannot be specified for these + // compression types. + return; + case Compression::GZIP: + compression_level = 2; + break; + case Compression::BZ2: + // SKIP: BZ2 doesn't support one-shot compression + return; + case Compression::ZSTD: + compression_level = 4; + break; + case Compression::BROTLI: + compression_level = 10; + break; + default: + FAIL() << "Unhandled compression type"; + return; } + + std::vector data = MakeRandomData(2000); + // create multiple compressors to try to break them + std::unique_ptr c1, c2; + ASSERT_OK(Codec::Create(compression, compression_level, &c1)); + ASSERT_OK(Codec::Create(compression, compression_level, &c2)); + CheckCodecRoundtrip(c1, c2, data); } TEST_P(CodecTest, OutputBufferIsSmall) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index a44a8907b33..a7b5807faa8 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -34,8 +34,7 @@ namespace arrow { namespace util { -constexpr int kGZipDefaultCompressionLevel = 9; - +namespace { // ---------------------------------------------------------------------- // gzip implementation @@ -43,15 +42,17 @@ constexpr int kGZipDefaultCompressionLevel = 9; // there. // Maximum window size -static constexpr int WINDOW_BITS = 15; +constexpr int WINDOW_BITS = 15; // Output Gzip. -static constexpr int GZIP_CODEC = 16; +constexpr int GZIP_CODEC = 16; // Determine if this is libz or gzip from header. -static constexpr int DETECT_CODEC = 32; +constexpr int DETECT_CODEC = 32; + +constexpr int kGZipDefaultCompressionLevel = 9; -static int CompressionWindowBitsForFormat(GZipCodec::Format format) { +int CompressionWindowBitsForFormat(GZipCodec::Format format) { int window_bits = WINDOW_BITS; switch (format) { case GZipCodec::DEFLATE: @@ -66,7 +67,7 @@ static int CompressionWindowBitsForFormat(GZipCodec::Format format) { return window_bits; } -static int DecompressionWindowBitsForFormat(GZipCodec::Format format) { +int DecompressionWindowBitsForFormat(GZipCodec::Format format) { if (format == GZipCodec::DEFLATE) { return -WINDOW_BITS; } else { @@ -75,10 +76,12 @@ static int DecompressionWindowBitsForFormat(GZipCodec::Format format) { } } -static Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { +Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)"); } +} // namespace + // ---------------------------------------------------------------------- // gzip decompressor implementation @@ -171,7 +174,8 @@ class GZipDecompressor : public Decompressor { class GZipCompressor : public Compressor { public: - GZipCompressor() : initialized_(false) {} + explicit GZipCompressor(int compression_level) + : initialized_(false), compression_level_(compression_level) {} ~GZipCompressor() override { if (initialized_) { @@ -187,7 +191,7 @@ class GZipCompressor : public Compressor { // Initialize to run specified format int window_bits = CompressionWindowBitsForFormat(format); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, - kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { + compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibError("zlib deflateInit failed: "); } else { initialized_ = true; @@ -211,6 +215,7 @@ class GZipCompressor : public Compressor { z_stream stream_; bool initialized_; + int compression_level_; }; Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -313,10 +318,11 @@ Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w class GZipCodec::GZipCodecImpl { public: - explicit GZipCodecImpl(GZipCodec::Format format) + explicit GZipCodecImpl(int compression_level, GZipCodec::Format format) : format_(format), compressor_initialized_(false), - decompressor_initialized_(false) {} + decompressor_initialized_(false), + compression_level_(compression_level) {} ~GZipCodecImpl() { EndCompressor(); @@ -324,7 +330,7 @@ class GZipCodec::GZipCodecImpl { } Status MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init(format_)); *out = ptr; return Status::OK(); @@ -345,7 +351,7 @@ class GZipCodec::GZipCodecImpl { // Initialize to run specified format int window_bits = CompressionWindowBitsForFormat(format_); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, - kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { + compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg); } compressor_initialized_ = true; @@ -497,9 +503,14 @@ class GZipCodec::GZipCodecImpl { // perform the refactoring then bool compressor_initialized_; bool decompressor_initialized_; + int compression_level_; }; -GZipCodec::GZipCodec(Format format) { impl_.reset(new GZipCodecImpl(format)); } +GZipCodec::GZipCodec(int compression_level, Format format) { + impl_.reset(new GZipCodecImpl(compression_level, format)); +} + +GZipCodec::GZipCodec(Format format) : GZipCodec(kGZipDefaultCompressionLevel, format) {} GZipCodec::~GZipCodec() {} diff --git a/cpp/src/arrow/util/compression_zlib.h b/cpp/src/arrow/util/compression_zlib.h index 9a5feaa290c..4679835fe5a 100644 --- a/cpp/src/arrow/util/compression_zlib.h +++ b/cpp/src/arrow/util/compression_zlib.h @@ -38,6 +38,7 @@ class ARROW_EXPORT GZipCodec : public Codec { GZIP, }; + explicit GZipCodec(int compression_level, Format format = GZIP); explicit GZipCodec(Format format = GZIP); ~GZipCodec() override; diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 87faf006a47..08206ab75ab 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -31,13 +31,17 @@ using std::size_t; namespace arrow { namespace util { -// XXX level = 1 probably doesn't compress very much -constexpr int kZSTDDefaultCompressionLevel = 1; +namespace { -static Status ZSTDError(size_t ret, const char* prefix_msg) { +Status ZSTDError(size_t ret, const char* prefix_msg) { return Status::IOError(prefix_msg, ZSTD_getErrorName(ret)); } +// XXX level = 1 probably doesn't compress very much +constexpr int kZSTDDefaultCompressionLevel = 1; + +} // namespace + // ---------------------------------------------------------------------- // ZSTD decompressor implementation @@ -96,12 +100,13 @@ class ZSTDDecompressor : public Decompressor { class ZSTDCompressor : public Compressor { public: - ZSTDCompressor() : stream_(ZSTD_createCStream()) {} + explicit ZSTDCompressor(int compression_level) + : stream_(ZSTD_createCStream()), compression_level_(compression_level) {} ~ZSTDCompressor() override { ZSTD_freeCStream(stream_); } Status Init() { - size_t ret = ZSTD_initCStream(stream_, kZSTDDefaultCompressionLevel); + size_t ret = ZSTD_initCStream(stream_, compression_level_); if (ZSTD_isError(ret)) { return ZSTDError(ret, "ZSTD init failed: "); } else { @@ -120,6 +125,9 @@ class ZSTDCompressor : public Compressor { protected: ZSTD_CStream* stream_; + + private: + int compression_level_; }; Status ZSTDCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -184,8 +192,12 @@ Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w // ---------------------------------------------------------------------- // ZSTD codec implementation +ZSTDCodec::ZSTDCodec(int compression_level) : compression_level_(compression_level) {} + +ZSTDCodec::ZSTDCodec() : ZSTDCodec(kZSTDDefaultCompressionLevel) {} + Status ZSTDCodec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); @@ -237,9 +249,8 @@ int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len, Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_len) { - size_t ret = - ZSTD_compress(output_buffer, static_cast(output_buffer_len), input, - static_cast(input_len), kZSTDDefaultCompressionLevel); + size_t ret = ZSTD_compress(output_buffer, static_cast(output_buffer_len), input, + static_cast(input_len), compression_level_); if (ZSTD_isError(ret)) { return ZSTDError(ret, "ZSTD compression failed: "); } diff --git a/cpp/src/arrow/util/compression_zstd.h b/cpp/src/arrow/util/compression_zstd.h index 8b05d8c80a9..822827aaa44 100644 --- a/cpp/src/arrow/util/compression_zstd.h +++ b/cpp/src/arrow/util/compression_zstd.h @@ -31,6 +31,10 @@ namespace util { // ZSTD codec. class ARROW_EXPORT ZSTDCodec : public Codec { public: + explicit ZSTDCodec(int compression_level); + + ZSTDCodec(); + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +51,9 @@ class ARROW_EXPORT ZSTDCodec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "zstd"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 019b8d46ddc..1506e107e72 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -39,7 +39,8 @@ std::shared_ptr BuildWriter(int64_t output_size, ColumnDescriptor* schema, const WriterProperties* properties) { std::unique_ptr pager = - PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata); + PageWriter::Open(dst, Compression::UNCOMPRESSED, + ::arrow::util::GetHintValueForDefaultCompressionLevel(), metadata); std::shared_ptr writer = ColumnWriter::Make(metadata, std::move(pager), properties); return std::static_pointer_cast(writer); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index d9d37ae376d..dd1aa108d1f 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -136,7 +136,8 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) { class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(const std::shared_ptr& sink, - Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + Compression::type codec, int compression_level, + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool = arrow::default_memory_pool()) : sink_(sink), metadata_(metadata), @@ -146,7 +147,7 @@ class SerializedPageWriter : public PageWriter { data_page_offset_(0), total_uncompressed_size_(0), total_compressed_size_(0) { - compressor_ = GetCodecFromArrow(codec); + compressor_ = GetCodecFromArrow(codec, compression_level); thrift_serializer_.reset(new ThriftSerializer); } @@ -291,12 +292,13 @@ class SerializedPageWriter : public PageWriter { class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(const std::shared_ptr& sink, - Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + Compression::type codec, int compression_level, + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool = arrow::default_memory_pool()) : final_sink_(sink), metadata_(metadata) { in_memory_sink_ = CreateOutputStream(pool); - pager_ = std::unique_ptr( - new SerializedPageWriter(in_memory_sink_, codec, metadata, pool)); + pager_ = std::unique_ptr(new SerializedPageWriter( + in_memory_sink_, codec, compression_level, metadata, pool)); } int64_t WriteDictionaryPage(const DictionaryPage& page) override { @@ -340,13 +342,14 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr PageWriter::Open( const std::shared_ptr& sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, bool buffered_row_group) { + int compression_level, ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, + bool buffered_row_group) { if (buffered_row_group) { return std::unique_ptr( - new BufferedPageWriter(sink, codec, metadata, pool)); + new BufferedPageWriter(sink, codec, compression_level, metadata, pool)); } else { return std::unique_ptr( - new SerializedPageWriter(sink, codec, metadata, pool)); + new SerializedPageWriter(sink, codec, compression_level, metadata, pool)); } } diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 27ca400eb46..609ee8d13c8 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -84,7 +84,7 @@ class PARQUET_EXPORT PageWriter { static std::unique_ptr Open( const std::shared_ptr& sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, + int compression_level, ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), bool buffered_row_group = false); diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index cee45c0c6f8..67c4fc16a51 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -107,8 +107,9 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { writer_properties_ = wp_builder.build(); metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); - std::unique_ptr pager = - PageWriter::Open(sink_, column_properties.compression(), metadata_.get()); + std::unique_ptr pager = PageWriter::Open( + sink_, column_properties.compression(), + arrow::util::GetHintValueForDefaultCompressionLevel(), metadata_.get()); std::shared_ptr writer = ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); return std::static_pointer_cast>(writer); @@ -128,17 +129,18 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); } - void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression, - bool enable_dictionary, bool enable_statistics, - int64_t num_rows = SMALL_SIZE) { + void TestRequiredWithSettings( + Encoding::type encoding, Compression::type compression, bool enable_dictionary, + bool enable_statistics, int64_t num_rows = SMALL_SIZE, + int compression_level = arrow::util::GetHintValueForDefaultCompressionLevel()) { this->GenerateData(num_rows); this->WriteRequiredWithSettings(encoding, compression, enable_dictionary, - enable_statistics, num_rows); + enable_statistics, compression_level, num_rows); ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary, - enable_statistics, num_rows); + enable_statistics, num_rows, compression_level); ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); } @@ -188,9 +190,10 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression, bool enable_dictionary, bool enable_statistics, - int64_t num_rows) { + int compression_level, int64_t num_rows) { ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); + column_properties.set_compression_level(compression_level); std::shared_ptr> writer = this->BuildWriter(num_rows, column_properties); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); @@ -202,11 +205,12 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void WriteRequiredWithSettingsSpaced(Encoding::type encoding, Compression::type compression, bool enable_dictionary, bool enable_statistics, - int64_t num_rows) { + int64_t num_rows, int compression_level) { std::vector valid_bits( BitUtil::BytesForBits(static_cast(this->values_.size())) + 1, 255); ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); + column_properties.set_compression_level(compression_level); std::shared_ptr> writer = this->BuildWriter(num_rows, column_properties); writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, @@ -406,11 +410,21 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) { LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false, + LARGE_SIZE, 10); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, + LARGE_SIZE, 10); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false, LARGE_SIZE); @@ -449,6 +463,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) { LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false, + LARGE_SIZE, 6); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true, LARGE_SIZE); @@ -676,8 +695,9 @@ TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { auto props = WriterProperties::Builder().build(); auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); - std::unique_ptr pager = - PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); + std::unique_ptr pager = PageWriter::Open( + sink, Compression::UNCOMPRESSED, + ::arrow::util::GetHintValueForDefaultCompressionLevel(), metadata.get()); std::shared_ptr writer = ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); auto typed_writer = std::static_pointer_cast>(writer); diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 22c75fa05fb..e2121648e97 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -27,6 +27,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/schema.h" +#include "parquet/types.h" using arrow::MemoryPool; @@ -124,10 +125,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents { ++next_column_index_; - const ColumnDescriptor* column_descr = col_meta->descr(); - std::unique_ptr pager = - PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta, - properties_->memory_pool()); + const auto& path = col_meta->descr()->path(); + std::unique_ptr pager = PageWriter::Open( + sink_, properties_->compression(path), properties_->compression_level(path), + col_meta, properties_->memory_pool()); column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); return column_writers_[0].get(); } @@ -221,10 +222,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents { void InitColumns() { for (int i = 0; i < num_columns(); i++) { auto col_meta = metadata_->NextColumnChunk(); - const ColumnDescriptor* column_descr = col_meta->descr(); - std::unique_ptr pager = - PageWriter::Open(sink_, properties_->compression(column_descr->path()), - col_meta, properties_->memory_pool(), buffered_row_group_); + const auto& path = col_meta->descr()->path(); + std::unique_ptr pager = PageWriter::Open( + sink_, properties_->compression(path), properties_->compression_level(path), + col_meta, properties_->memory_pool(), buffered_row_group_); column_writers_.push_back( ColumnWriter::Make(col_meta, std::move(pager), properties_)); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 209969a0054..0e1e64ac21a 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -24,6 +24,7 @@ #include #include "arrow/type.h" +#include "arrow/util/compression.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" @@ -95,7 +96,8 @@ class PARQUET_EXPORT ColumnProperties { codec_(codec), dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), - max_stats_size_(max_stats_size) {} + max_stats_size_(max_stats_size), + compression_level_(::arrow::util::GetHintValueForDefaultCompressionLevel()) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } @@ -113,6 +115,10 @@ class PARQUET_EXPORT ColumnProperties { max_stats_size_ = max_stats_size; } + void set_compression_level(int compression_level) { + compression_level_ = compression_level; + } + Encoding::type encoding() const { return encoding_; } Compression::type compression() const { return codec_; } @@ -123,12 +129,15 @@ class PARQUET_EXPORT ColumnProperties { size_t max_statistics_size() const { return max_stats_size_; } + int compression_level() const { return compression_level_; } + private: Encoding::type encoding_; Compression::type codec_; bool dictionary_enabled_; bool statistics_enabled_; size_t max_stats_size_; + int compression_level_; }; class PARQUET_EXPORT WriterProperties { @@ -271,6 +280,59 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } + /** + * Specify the default compression level for the compressor in every column. + * In case a column does not have an explicitly specified compression level, + * the default one would be used. + * + * The provided compression level is compressor specific. The user would have + * to familiarize oneself with the available levels for the selected compressor. + * If the compressor does not allow for selecting different compression levels, + * calling this function would not have any effect. + * Parquet and Arrow do not validate the passed compression level. + * If no level is selected by the user or if the special + * std::numeric_limits::min() value is passed, then Arrow selects the compression + * level. + */ + Builder* compression_level(int compression_level) { + default_column_properties_.set_compression_level(compression_level); + return this; + } + + /** + * Specify a compression level for the compressor for the column described by path. + * + * The provided compression level is compressor specific. The user would have + * to familiarize oneself with the available levels for the selected compressor. + * If the compressor does not allow for selecting different compression levels, + * calling this function would not have any effect. + * Parquet and Arrow do not validate the passed compression level. + * If no level is selected by the user or if the special + * std::numeric_limits::min() value is passed, then Arrow selects the compression + * level. + */ + Builder* compression_level(const std::string& path, int compression_level) { + codecs_compression_level_[path] = compression_level; + return this; + } + + /** + * Specify a compression level for the compressor for the column described by path. + * + * The provided compression level is compressor specific. The user would have + * to familiarize oneself with the available levels for the selected compressor. + * If the compressor does not allow for selecting different compression levels, + * calling this function would not have any effect. + * Parquet and Arrow do not validate the passed compression level. + * If no level is selected by the user or if the special + * std::numeric_limits::min() value is passed, then Arrow selects the compression + * level. + */ + Builder* compression_level(const std::shared_ptr& path, + int compression_level) { + return this->compression_level(path->ToDotString(), compression_level); + } + Builder* enable_statistics() { default_column_properties_.set_statistics_enabled(true); return this; @@ -311,6 +373,8 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : encodings_) get(item.first).set_encoding(item.second); for (const auto& item : codecs_) get(item.first).set_compression(item.second); + for (const auto& item : codecs_compression_level_) + get(item.first).set_compression_level(item.second); for (const auto& item : dictionary_enabled_) get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) @@ -335,6 +399,7 @@ class PARQUET_EXPORT WriterProperties { ColumnProperties default_column_properties_; std::unordered_map encodings_; std::unordered_map codecs_; + std::unordered_map codecs_compression_level_; std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; }; @@ -384,6 +449,10 @@ class PARQUET_EXPORT WriterProperties { return column_properties(path).compression(); } + int compression_level(const std::shared_ptr& path) const { + return column_properties(path).compression_level(); + } + bool dictionary_enabled(const std::shared_ptr& path) const { return column_properties(path).dictionary_enabled(); } diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index f7e0cf3e4c0..1499ebcbcc9 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -35,27 +35,38 @@ using arrow::util::Codec; namespace parquet { std::unique_ptr GetCodecFromArrow(Compression::type codec) { + return GetCodecFromArrow(codec, + ::arrow::util::GetHintValueForDefaultCompressionLevel()); +} + +std::unique_ptr GetCodecFromArrow(Compression::type codec, int compression_level) { std::unique_ptr result; switch (codec) { case Compression::UNCOMPRESSED: break; case Compression::SNAPPY: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::SNAPPY, compression_level, &result)); break; case Compression::GZIP: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::GZIP, compression_level, &result)); break; case Compression::LZO: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::LZO, compression_level, &result)); break; case Compression::BROTLI: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::BROTLI, compression_level, &result)); break; case Compression::LZ4: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::LZ4, compression_level, &result)); break; case Compression::ZSTD: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result)); + PARQUET_THROW_NOT_OK( + Codec::Create(::arrow::Compression::ZSTD, compression_level, &result)); break; default: break; diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 30395f37ec4..a4b55211e09 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -464,6 +464,10 @@ struct Compression { PARQUET_EXPORT std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec); +PARQUET_EXPORT +std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec, + int compression_level); + struct Encryption { enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 }; }; From 06d8a14f3ea86f1ab0e9480de93483085cc44f6a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Aug 2019 11:32:28 -0500 Subject: [PATCH 2/5] Use arrow::Compression in parquet:: namespace, address other stylistic nits --- cpp/src/arrow/util/compression.cc | 33 +++++++++-- cpp/src/arrow/util/compression.h | 16 ++--- cpp/src/arrow/util/compression_test.cc | 10 ++++ cpp/src/parquet/column_io_benchmark.cc | 5 +- cpp/src/parquet/column_reader.cc | 2 +- cpp/src/parquet/column_writer.cc | 2 +- cpp/src/parquet/column_writer_test.cc | 14 ++--- cpp/src/parquet/file_deserialize_test.cc | 2 +- cpp/src/parquet/platform.h | 3 + cpp/src/parquet/printer.cc | 4 +- cpp/src/parquet/properties.h | 74 +++++++++++------------- cpp/src/parquet/thrift.h | 46 +++++++++++++-- cpp/src/parquet/types.cc | 72 +++++++---------------- cpp/src/parquet/types.h | 14 ++--- cpp/src/parquet/types_test.cc | 10 ---- 15 files changed, 167 insertions(+), 140 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 9e58180a2a2..68842136429 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -19,6 +19,7 @@ #include #include +#include #ifdef ARROW_WITH_BROTLI #include "arrow/util/compression_brotli.h" @@ -49,23 +50,45 @@ namespace arrow { namespace util { -int GetHintValueForDefaultCompressionLevel() { return std::numeric_limits::min(); } - Compressor::~Compressor() {} Decompressor::~Decompressor() {} Codec::~Codec() {} +int Codec::UseDefaultCompressionLevel() { return std::numeric_limits::min(); } + +std::string Codec::GetCodecAsString(Compression::type t) { + switch (t) { + case Compression::UNCOMPRESSED: + return "UNCOMPRESSED"; + case Compression::SNAPPY: + return "SNAPPY"; + case Compression::GZIP: + return "GZIP"; + case Compression::LZO: + return "LZO"; + case Compression::BROTLI: + return "BROTLI"; + case Compression::LZ4: + return "LZ4"; + case Compression::ZSTD: + return "ZSTD"; + case Compression::BZ2: + return "BZ2"; + default: + return "UNKNOWN"; + } +} + Status Codec::Create(Compression::type codec_type, std::unique_ptr* result) { - const int compression_level = GetHintValueForDefaultCompressionLevel(); - return Codec::Create(codec_type, compression_level, result); + return Codec::Create(codec_type, Codec::UseDefaultCompressionLevel(), result); } Status Codec::Create(Compression::type codec_type, int compression_level, std::unique_ptr* result) { const bool use_default_compression_level = - (compression_level == GetHintValueForDefaultCompressionLevel()); + (compression_level == Codec::UseDefaultCompressionLevel()); Codec* codec = nullptr; switch (codec_type) { case Compression::UNCOMPRESSED: diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 7d164f8a5a8..064b20228b1 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_UTIL_COMPRESSION_H -#define ARROW_UTIL_COMPRESSION_H +#pragma once #include #include +#include #include "arrow/util/visibility.h" @@ -33,9 +33,6 @@ struct Compression { namespace util { -ARROW_EXPORT -int GetHintValueForDefaultCompressionLevel(); - /// \brief Streaming compressor interface /// class ARROW_EXPORT Compressor { @@ -100,6 +97,13 @@ class ARROW_EXPORT Codec { public: virtual ~Codec(); + /// \brief Return special value to indicate that a codec implementation + /// should use its default compression level + static int UseDefaultCompressionLevel(); + + /// \brief Return a string name for compression type + static std::string GetCodecAsString(Compression::type t); + static Status Create(Compression::type codec, std::unique_ptr* out); static Status Create(Compression::type codec, int compression_level, std::unique_ptr* out); @@ -157,5 +161,3 @@ class ARROW_EXPORT Codec { } // namespace util } // namespace arrow - -#endif diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 401a7ab804f..a2594b26ebd 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -334,6 +334,16 @@ class CodecTest : public ::testing::TestWithParam { } }; +TEST(TestCodecMisc, GetCodecAsString) { + ASSERT_EQ("UNCOMPRESSED", Codec::GetCodecAsString(Compression::UNCOMPRESSED)); + ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY)); + ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP)); + ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO)); + ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI)); + ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4)); + ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD)); +} + TEST_P(CodecTest, CodecRoundtrip) { const auto compression = GetCompression(); if (compression == Compression::BZ2) { diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 1506e107e72..9916310da94 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -38,9 +38,8 @@ std::shared_ptr BuildWriter(int64_t output_size, ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema, const WriterProperties* properties) { - std::unique_ptr pager = - PageWriter::Open(dst, Compression::UNCOMPRESSED, - ::arrow::util::GetHintValueForDefaultCompressionLevel(), metadata); + std::unique_ptr pager = PageWriter::Open( + dst, Compression::UNCOMPRESSED, Codec::UseDefaultCompressionLevel(), metadata); std::shared_ptr writer = ColumnWriter::Make(metadata, std::move(pager), properties); return std::static_pointer_cast(writer); diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 288e48fe96b..5c3e9831dd4 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -120,7 +120,7 @@ class SerializedPageReader : public PageReader { seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = kDefaultMaxPageHeaderSize; - decompressor_ = GetCodecFromArrow(codec); + decompressor_ = GetCodec(codec); } // Implement the PageReader interface diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index dd1aa108d1f..36120555cc1 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -147,7 +147,7 @@ class SerializedPageWriter : public PageWriter { data_page_offset_(0), total_uncompressed_size_(0), total_compressed_size_(0) { - compressor_ = GetCodecFromArrow(codec, compression_level); + compressor_ = GetCodec(codec, compression_level); thrift_serializer_.reset(new ThriftSerializer); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 67c4fc16a51..b5ef9486bc9 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -107,9 +107,9 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { writer_properties_ = wp_builder.build(); metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); - std::unique_ptr pager = PageWriter::Open( - sink_, column_properties.compression(), - arrow::util::GetHintValueForDefaultCompressionLevel(), metadata_.get()); + std::unique_ptr pager = + PageWriter::Open(sink_, column_properties.compression(), + Codec::UseDefaultCompressionLevel(), metadata_.get()); std::shared_ptr writer = ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); return std::static_pointer_cast>(writer); @@ -132,7 +132,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void TestRequiredWithSettings( Encoding::type encoding, Compression::type compression, bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE, - int compression_level = arrow::util::GetHintValueForDefaultCompressionLevel()) { + int compression_level = Codec::UseDefaultCompressionLevel()) { this->GenerateData(num_rows); this->WriteRequiredWithSettings(encoding, compression, enable_dictionary, @@ -695,9 +695,9 @@ TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { auto props = WriterProperties::Builder().build(); auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); - std::unique_ptr pager = PageWriter::Open( - sink, Compression::UNCOMPRESSED, - ::arrow::util::GetHintValueForDefaultCompressionLevel(), metadata.get()); + std::unique_ptr pager = + PageWriter::Open(sink, Compression::UNCOMPRESSED, + Codec::UseDefaultCompressionLevel(), metadata.get()); std::shared_ptr writer = ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); auto typed_writer = std::static_pointer_cast>(writer); diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc index b4df05cd07a..ef47da42772 100644 --- a/cpp/src/parquet/file_deserialize_test.cc +++ b/cpp/src/parquet/file_deserialize_test.cc @@ -247,7 +247,7 @@ TEST_F(TestPageSerde, Compression) { test::random_bytes(page_size, 0, &faux_data[i]); } for (auto codec_type : codec_types) { - auto codec = GetCodecFromArrow(codec_type); + auto codec = GetCodec(codec_type); std::vector buffer; for (int i = 0; i < num_pages; ++i) { diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h index 096e813b86c..76fbd84345c 100644 --- a/cpp/src/parquet/platform.h +++ b/cpp/src/parquet/platform.h @@ -26,6 +26,7 @@ #include "arrow/memory_pool.h" // IWYU pragma: export #include "arrow/status.h" // IWYU pragma: export #include "arrow/util/bit_util.h" // IWYU pragma: export +#include "arrow/util/compression.h" // IWYU pragma: export #include "arrow/util/macros.h" // IWYU pragma: export #include "arrow/util/string_view.h" // IWYU pragma: export @@ -91,6 +92,8 @@ namespace parquet { namespace BitUtil = ::arrow::BitUtil; using Buffer = ::arrow::Buffer; +using Codec = ::arrow::util::Codec; +using Compression = ::arrow::Compression; using MemoryPool = ::arrow::MemoryPool; using MutableBuffer = ::arrow::MutableBuffer; using ResizableBuffer = ::arrow::ResizableBuffer; diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 367c0e30c1e..141e719ddcd 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -121,7 +121,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte stream << " Statistics Not Set"; } stream << std::endl - << " Compression: " << CompressionToString(column_chunk->compression()) + << " Compression: " << Codec::GetCodecAsString(column_chunk->compression()) << ", Encodings:"; for (auto encoding : column_chunk->encodings()) { stream << " " << EncodingToString(encoding); @@ -255,7 +255,7 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected stream << "\"False\","; } stream << "\n \"Compression\": \"" - << CompressionToString(column_chunk->compression()) + << Codec::GetCodecAsString(column_chunk->compression()) << "\", \"Encodings\": \""; for (auto encoding : column_chunk->encodings()) { stream << EncodingToString(encoding) << " "; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 0e1e64ac21a..a834be8b211 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -97,7 +97,7 @@ class PARQUET_EXPORT ColumnProperties { dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), max_stats_size_(max_stats_size), - compression_level_(::arrow::util::GetHintValueForDefaultCompressionLevel()) {} + compression_level_(Codec::UseDefaultCompressionLevel()) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } @@ -280,54 +280,50 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } - /** - * Specify the default compression level for the compressor in every column. - * In case a column does not have an explicitly specified compression level, - * the default one would be used. - * - * The provided compression level is compressor specific. The user would have - * to familiarize oneself with the available levels for the selected compressor. - * If the compressor does not allow for selecting different compression levels, - * calling this function would not have any effect. - * Parquet and Arrow do not validate the passed compression level. - * If no level is selected by the user or if the special - * std::numeric_limits::min() value is passed, then Arrow selects the compression - * level. - */ + /// \brief Specify the default compression level for the compressor in + /// every column. In case a column does not have an explicitly specified + /// compression level, the default one would be used. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. Builder* compression_level(int compression_level) { default_column_properties_.set_compression_level(compression_level); return this; } - /** - * Specify a compression level for the compressor for the column described by path. - * - * The provided compression level is compressor specific. The user would have - * to familiarize oneself with the available levels for the selected compressor. - * If the compressor does not allow for selecting different compression levels, - * calling this function would not have any effect. - * Parquet and Arrow do not validate the passed compression level. - * If no level is selected by the user or if the special - * std::numeric_limits::min() value is passed, then Arrow selects the compression - * level. - */ + /// \brief Specify a compression level for the compressor for the column + /// described by path. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. Builder* compression_level(const std::string& path, int compression_level) { codecs_compression_level_[path] = compression_level; return this; } - /** - * Specify a compression level for the compressor for the column described by path. - * - * The provided compression level is compressor specific. The user would have - * to familiarize oneself with the available levels for the selected compressor. - * If the compressor does not allow for selecting different compression levels, - * calling this function would not have any effect. - * Parquet and Arrow do not validate the passed compression level. - * If no level is selected by the user or if the special - * std::numeric_limits::min() value is passed, then Arrow selects the compression - * level. - */ + /// \brief Specify a compression level for the compressor for the column + /// described by path. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. Builder* compression_level(const std::shared_ptr& path, int compression_level) { return this->compression_level(path->ToDotString(), compression_level); diff --git a/cpp/src/parquet/thrift.h b/cpp/src/parquet/thrift.h index c7b62073df5..9886503216c 100644 --- a/cpp/src/parquet/thrift.h +++ b/cpp/src/parquet/thrift.h @@ -77,10 +77,6 @@ static inline Encoding::type FromThrift(format::Encoding::type type) { return static_cast(type); } -static inline Compression::type FromThrift(format::CompressionCodec::type type) { - return static_cast(type); -} - static inline format::Type::type ToThrift(Type::type type) { return static_cast(type); } @@ -99,8 +95,48 @@ static inline format::Encoding::type ToThrift(Encoding::type type) { return static_cast(type); } +static inline Compression::type FromThrift(format::CompressionCodec::type type) { + switch (type) { + case format::CompressionCodec::UNCOMPRESSED: + return Compression::UNCOMPRESSED; + case format::CompressionCodec::SNAPPY: + return Compression::SNAPPY; + case format::CompressionCodec::GZIP: + return Compression::GZIP; + case format::CompressionCodec::LZO: + return Compression::LZO; + case format::CompressionCodec::BROTLI: + return Compression::BROTLI; + case format::CompressionCodec::LZ4: + return Compression::LZ4; + case format::CompressionCodec::ZSTD: + return Compression::ZSTD; + default: + DCHECK(false) << "Cannot reach here"; + return Compression::UNCOMPRESSED; + } +} + static inline format::CompressionCodec::type ToThrift(Compression::type type) { - return static_cast(type); + switch (type) { + case Compression::UNCOMPRESSED: + return format::CompressionCodec::UNCOMPRESSED; + case Compression::SNAPPY: + return format::CompressionCodec::SNAPPY; + case Compression::GZIP: + return format::CompressionCodec::GZIP; + case Compression::LZO: + return format::CompressionCodec::LZO; + case Compression::BROTLI: + return format::CompressionCodec::BROTLI; + case Compression::LZ4: + return format::CompressionCodec::LZ4; + case Compression::ZSTD: + return format::CompressionCodec::ZSTD; + default: + DCHECK(false) << "Cannot reach here"; + return format::CompressionCodec::UNCOMPRESSED; + } } static inline format::Statistics ToThrift(const EncodedStatistics& stats) { diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 1499ebcbcc9..0d6f1a15f83 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -34,42 +34,35 @@ using arrow::util::Codec; namespace parquet { -std::unique_ptr GetCodecFromArrow(Compression::type codec) { - return GetCodecFromArrow(codec, - ::arrow::util::GetHintValueForDefaultCompressionLevel()); -} - -std::unique_ptr GetCodecFromArrow(Compression::type codec, int compression_level) { - std::unique_ptr result; +bool IsCodecSupported(Compression::type codec) { switch (codec) { case Compression::UNCOMPRESSED: - break; case Compression::SNAPPY: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::SNAPPY, compression_level, &result)); - break; case Compression::GZIP: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::GZIP, compression_level, &result)); - break; - case Compression::LZO: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::LZO, compression_level, &result)); - break; case Compression::BROTLI: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::BROTLI, compression_level, &result)); - break; - case Compression::LZ4: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::LZ4, compression_level, &result)); - break; case Compression::ZSTD: - PARQUET_THROW_NOT_OK( - Codec::Create(::arrow::Compression::ZSTD, compression_level, &result)); - break; + case Compression::LZ4: + return true; default: - break; + return false; + } +} + +std::unique_ptr GetCodec(Compression::type codec) { + return GetCodec(codec, Codec::UseDefaultCompressionLevel()); +} + +std::unique_ptr GetCodec(Compression::type codec, int compression_level) { + std::unique_ptr result; + if (!IsCodecSupported(codec)) { + std::stringstream ss; + ss << "Codec type " << Codec::GetCodecAsString(codec) + << " not supported in Parquet format"; + throw ParquetException(ss.str()); + } + + if (codec != Compression::UNCOMPRESSED) { + PARQUET_THROW_NOT_OK(Codec::Create(codec, compression_level, &result)); } return result; } @@ -171,27 +164,6 @@ std::string EncodingToString(Encoding::type t) { } } -std::string CompressionToString(Compression::type t) { - switch (t) { - case Compression::UNCOMPRESSED: - return "UNCOMPRESSED"; - case Compression::SNAPPY: - return "SNAPPY"; - case Compression::GZIP: - return "GZIP"; - case Compression::LZO: - return "LZO"; - case Compression::BROTLI: - return "BROTLI"; - case Compression::LZ4: - return "LZ4"; - case Compression::ZSTD: - return "ZSTD"; - default: - return "UNKNOWN"; - } -} - std::string TypeToString(Type::type t) { switch (t) { case Type::BOOLEAN: diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index a4b55211e09..0d3b3ba492c 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -456,17 +456,15 @@ struct Encoding { }; }; -// Compression, mirrors parquet::CompressionCodec -struct Compression { - enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD }; -}; +/// \brief Return true if Parquet supports indicated compression type +PARQUET_EXPORT +bool IsCodecSupported(Compression::type codec); PARQUET_EXPORT -std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec); +std::unique_ptr GetCodec(Compression::type codec); PARQUET_EXPORT -std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec, - int compression_level); +std::unique_ptr GetCodec(Compression::type codec, int compression_level); struct Encryption { enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 }; @@ -661,8 +659,6 @@ inline std::string format_fwf(int width) { return ss.str(); } -PARQUET_EXPORT std::string CompressionToString(Compression::type t); - PARQUET_EXPORT std::string EncodingToString(Encoding::type t); PARQUET_EXPORT std::string ConvertedTypeToString(ConvertedType::type t); diff --git a/cpp/src/parquet/types_test.cc b/cpp/src/parquet/types_test.cc index 9da5642de1e..dfefe971583 100644 --- a/cpp/src/parquet/types_test.cc +++ b/cpp/src/parquet/types_test.cc @@ -63,16 +63,6 @@ TEST(TestConvertedTypeToString, ConvertedTypes) { ASSERT_STREQ("INTERVAL", ConvertedTypeToString(ConvertedType::INTERVAL).c_str()); } -TEST(TestCompressionToString, Compression) { - ASSERT_STREQ("UNCOMPRESSED", CompressionToString(Compression::UNCOMPRESSED).c_str()); - ASSERT_STREQ("SNAPPY", CompressionToString(Compression::SNAPPY).c_str()); - ASSERT_STREQ("GZIP", CompressionToString(Compression::GZIP).c_str()); - ASSERT_STREQ("LZO", CompressionToString(Compression::LZO).c_str()); - ASSERT_STREQ("BROTLI", CompressionToString(Compression::BROTLI).c_str()); - ASSERT_STREQ("LZ4", CompressionToString(Compression::LZ4).c_str()); - ASSERT_STREQ("ZSTD", CompressionToString(Compression::ZSTD).c_str()); -} - #ifdef __GNUC__ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" From 1c905868c32694a772367e46d75cfac626eda883 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Aug 2019 11:48:32 -0500 Subject: [PATCH 3/5] Move default compression levels to headers, remove nullary ctors --- cpp/src/arrow/util/compression.cc | 28 +++++------------------- cpp/src/arrow/util/compression.h | 3 +++ cpp/src/arrow/util/compression_brotli.cc | 11 ++++------ cpp/src/arrow/util/compression_brotli.h | 7 ++++-- cpp/src/arrow/util/compression_bz2.cc | 9 ++++---- cpp/src/arrow/util/compression_bz2.h | 5 +++-- cpp/src/arrow/util/compression_zlib.cc | 4 ++-- cpp/src/arrow/util/compression_zlib.h | 5 ++++- cpp/src/arrow/util/compression_zstd.cc | 10 ++++----- cpp/src/arrow/util/compression_zstd.h | 7 +++--- 10 files changed, 38 insertions(+), 51 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 68842136429..a4503f3e2a4 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -56,7 +56,7 @@ Decompressor::~Decompressor() {} Codec::~Codec() {} -int Codec::UseDefaultCompressionLevel() { return std::numeric_limits::min(); } +int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } std::string Codec::GetCodecAsString(Compression::type t) { switch (t) { @@ -87,8 +87,6 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr* resul Status Codec::Create(Compression::type codec_type, int compression_level, std::unique_ptr* result) { - const bool use_default_compression_level = - (compression_level == Codec::UseDefaultCompressionLevel()); Codec* codec = nullptr; switch (codec_type) { case Compression::UNCOMPRESSED: @@ -102,11 +100,7 @@ Status Codec::Create(Compression::type codec_type, int compression_level, #endif case Compression::GZIP: #ifdef ARROW_WITH_ZLIB - if (use_default_compression_level) { - codec = new GZipCodec(); - } else { - codec = new GZipCodec(compression_level); - } + codec = new GZipCodec(compression_level); break; #else return Status::NotImplemented("Gzip codec support not built"); @@ -115,11 +109,7 @@ Status Codec::Create(Compression::type codec_type, int compression_level, return Status::NotImplemented("LZO codec not implemented"); case Compression::BROTLI: #ifdef ARROW_WITH_BROTLI - if (use_default_compression_level) { - codec = new BrotliCodec(); - } else { - codec = new BrotliCodec(compression_level); - } + codec = new BrotliCodec(compression_level); break; #else return Status::NotImplemented("Brotli codec support not built"); @@ -133,22 +123,14 @@ Status Codec::Create(Compression::type codec_type, int compression_level, #endif case Compression::ZSTD: #ifdef ARROW_WITH_ZSTD - if (use_default_compression_level) { - codec = new ZSTDCodec(); - } else { - codec = new ZSTDCodec(compression_level); - } + codec = new ZSTDCodec(compression_level); break; #else return Status::NotImplemented("ZSTD codec support not built"); #endif case Compression::BZ2: #ifdef ARROW_WITH_BZ2 - if (use_default_compression_level) { - codec = new BZ2Codec(); - } else { - codec = new BZ2Codec(compression_level); - } + codec = new BZ2Codec(compression_level); break; #else return Status::NotImplemented("BZ2 codec support not built"); diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 064b20228b1..ad6b816e5d4 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -33,6 +34,8 @@ struct Compression { namespace util { +constexpr int kUseDefaultCompressionLevel = std::numeric_limits::min(); + /// \brief Streaming compressor interface /// class ARROW_EXPORT Compressor { diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 92a051e5009..6fc55157d36 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -33,10 +33,6 @@ namespace util { namespace { -// Brotli compression quality is max (11) by default, which is slow. -// We use 8 as a default as it is the best trade-off for Parquet workload. -constexpr int kBrotliDefaultCompressionLevel = 8; - } // namespace // ---------------------------------------------------------------------- @@ -195,9 +191,10 @@ Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes // ---------------------------------------------------------------------- // Brotli codec implementation -BrotliCodec::BrotliCodec(int compression_level) : compression_level_(compression_level) {} - -BrotliCodec::BrotliCodec() : BrotliCodec(kBrotliDefaultCompressionLevel) {} +BrotliCodec::BrotliCodec(int compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel ? + kBrotliDefaultCompressionLevel : compression_level; +} Status BrotliCodec::MakeCompressor(std::shared_ptr* out) { auto ptr = std::make_shared(compression_level_); diff --git a/cpp/src/arrow/util/compression_brotli.h b/cpp/src/arrow/util/compression_brotli.h index c01b9560987..02eb0ffb25e 100644 --- a/cpp/src/arrow/util/compression_brotli.h +++ b/cpp/src/arrow/util/compression_brotli.h @@ -28,11 +28,14 @@ namespace arrow { namespace util { +// Brotli compression quality is max (11) by default, which is slow. +// We use 8 as a default as it is the best trade-off for Parquet workload. +constexpr int kBrotliDefaultCompressionLevel = 8; + // Brotli codec. class ARROW_EXPORT BrotliCodec : public Codec { public: - explicit BrotliCodec(int compression_level); - BrotliCodec(); + explicit BrotliCodec(int compression_level = kBrotliDefaultCompressionLevel); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 03bcbc6b3d1..7202165cd7a 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -39,8 +39,6 @@ namespace util { namespace { -constexpr int kBZ2DefaultCompressionLevel = 9; - // Max number of bytes the bz2 APIs accept at a time constexpr auto kSizeLimit = static_cast(std::numeric_limits::max()); @@ -238,9 +236,10 @@ class BZ2Compressor : public Compressor { // ---------------------------------------------------------------------- // bz2 codec implementation -BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) {} - -BZ2Codec::BZ2Codec() : BZ2Codec(kBZ2DefaultCompressionLevel) {} +BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel ? + kBZ2DefaultCompressionLevel : compression_level; +} Status BZ2Codec::MakeCompressor(std::shared_ptr* out) { auto ptr = std::make_shared(compression_level_); diff --git a/cpp/src/arrow/util/compression_bz2.h b/cpp/src/arrow/util/compression_bz2.h index eac044e49b5..d6666e4c831 100644 --- a/cpp/src/arrow/util/compression_bz2.h +++ b/cpp/src/arrow/util/compression_bz2.h @@ -28,11 +28,12 @@ namespace arrow { namespace util { +constexpr int kBZ2DefaultCompressionLevel = 9; + // BZ2 codec. class ARROW_EXPORT BZ2Codec : public Codec { public: - explicit BZ2Codec(int compression_level); - BZ2Codec(); + explicit BZ2Codec(int compression_level = kBZ2DefaultCompressionLevel); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index a7b5807faa8..048ba63147f 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -50,8 +50,6 @@ constexpr int GZIP_CODEC = 16; // Determine if this is libz or gzip from header. constexpr int DETECT_CODEC = 32; -constexpr int kGZipDefaultCompressionLevel = 9; - int CompressionWindowBitsForFormat(GZipCodec::Format format) { int window_bits = WINDOW_BITS; switch (format) { @@ -507,6 +505,8 @@ class GZipCodec::GZipCodecImpl { }; GZipCodec::GZipCodec(int compression_level, Format format) { + compression_level = compression_level == kUseDefaultCompressionLevel ? + kGZipDefaultCompressionLevel : compression_level; impl_.reset(new GZipCodecImpl(compression_level, format)); } diff --git a/cpp/src/arrow/util/compression_zlib.h b/cpp/src/arrow/util/compression_zlib.h index 4679835fe5a..05cee0643e1 100644 --- a/cpp/src/arrow/util/compression_zlib.h +++ b/cpp/src/arrow/util/compression_zlib.h @@ -28,6 +28,8 @@ namespace arrow { namespace util { +constexpr int kGZipDefaultCompressionLevel = 9; + // GZip codec. class ARROW_EXPORT GZipCodec : public Codec { public: @@ -38,7 +40,8 @@ class ARROW_EXPORT GZipCodec : public Codec { GZIP, }; - explicit GZipCodec(int compression_level, Format format = GZIP); + explicit GZipCodec(int compression_level = kGZipDefaultCompressionLevel, + Format format = GZIP); explicit GZipCodec(Format format = GZIP); ~GZipCodec() override; diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 08206ab75ab..48fd21118d9 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -37,9 +37,6 @@ Status ZSTDError(size_t ret, const char* prefix_msg) { return Status::IOError(prefix_msg, ZSTD_getErrorName(ret)); } -// XXX level = 1 probably doesn't compress very much -constexpr int kZSTDDefaultCompressionLevel = 1; - } // namespace // ---------------------------------------------------------------------- @@ -192,9 +189,10 @@ Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w // ---------------------------------------------------------------------- // ZSTD codec implementation -ZSTDCodec::ZSTDCodec(int compression_level) : compression_level_(compression_level) {} - -ZSTDCodec::ZSTDCodec() : ZSTDCodec(kZSTDDefaultCompressionLevel) {} +ZSTDCodec::ZSTDCodec(int compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel ? + kZSTDDefaultCompressionLevel : compression_level; +} Status ZSTDCodec::MakeCompressor(std::shared_ptr* out) { auto ptr = std::make_shared(compression_level_); diff --git a/cpp/src/arrow/util/compression_zstd.h b/cpp/src/arrow/util/compression_zstd.h index 822827aaa44..a757de46aa8 100644 --- a/cpp/src/arrow/util/compression_zstd.h +++ b/cpp/src/arrow/util/compression_zstd.h @@ -28,12 +28,13 @@ namespace arrow { namespace util { +// XXX level = 1 probably doesn't compress very much +constexpr int kZSTDDefaultCompressionLevel = 1; + // ZSTD codec. class ARROW_EXPORT ZSTDCodec : public Codec { public: - explicit ZSTDCodec(int compression_level); - - ZSTDCodec(); + explicit ZSTDCodec(int compression_level = kZSTDDefaultCompressionLevel); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; From beafdda49f0541bbcbf02438f61d7e56e7d7f0e2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Aug 2019 16:24:23 -0500 Subject: [PATCH 4/5] clang-format --- cpp/src/arrow/util/compression_brotli.cc | 12 +++--------- cpp/src/arrow/util/compression_bz2.cc | 5 +++-- cpp/src/arrow/util/compression_zlib.cc | 5 +++-- cpp/src/arrow/util/compression_zstd.cc | 5 +++-- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 6fc55157d36..4184c11b67e 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -31,13 +31,6 @@ namespace arrow { namespace util { -namespace { - -} // namespace - -// ---------------------------------------------------------------------- -// Brotli decompressor implementation - class BrotliDecompressor : public Decompressor { public: BrotliDecompressor() {} @@ -192,8 +185,9 @@ Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes // Brotli codec implementation BrotliCodec::BrotliCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel ? - kBrotliDefaultCompressionLevel : compression_level; + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kBrotliDefaultCompressionLevel + : compression_level; } Status BrotliCodec::MakeCompressor(std::shared_ptr* out) { diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 7202165cd7a..3ccf3493ed5 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -237,8 +237,9 @@ class BZ2Compressor : public Compressor { // bz2 codec implementation BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel ? - kBZ2DefaultCompressionLevel : compression_level; + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kBZ2DefaultCompressionLevel + : compression_level; } Status BZ2Codec::MakeCompressor(std::shared_ptr* out) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 048ba63147f..fe269e304c0 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -505,8 +505,9 @@ class GZipCodec::GZipCodecImpl { }; GZipCodec::GZipCodec(int compression_level, Format format) { - compression_level = compression_level == kUseDefaultCompressionLevel ? - kGZipDefaultCompressionLevel : compression_level; + compression_level = compression_level == kUseDefaultCompressionLevel + ? kGZipDefaultCompressionLevel + : compression_level; impl_.reset(new GZipCodecImpl(compression_level, format)); } diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 48fd21118d9..7fa851d17c5 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -190,8 +190,9 @@ Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w // ZSTD codec implementation ZSTDCodec::ZSTDCodec(int compression_level) { - compression_level_ = compression_level == kUseDefaultCompressionLevel ? - kZSTDDefaultCompressionLevel : compression_level; + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kZSTDDefaultCompressionLevel + : compression_level; } Status ZSTDCodec::MakeCompressor(std::shared_ptr* out) { From 8f124619b6845662ec76d0d824d4819054379174 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Aug 2019 18:51:20 -0500 Subject: [PATCH 5/5] Remove GzipCodec ctor that's ambiguous in MSVC --- cpp/src/arrow/util/compression_zlib.cc | 2 -- cpp/src/arrow/util/compression_zlib.h | 1 - 2 files changed, 3 deletions(-) diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index fe269e304c0..f2463a10062 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -511,8 +511,6 @@ GZipCodec::GZipCodec(int compression_level, Format format) { impl_.reset(new GZipCodecImpl(compression_level, format)); } -GZipCodec::GZipCodec(Format format) : GZipCodec(kGZipDefaultCompressionLevel, format) {} - GZipCodec::~GZipCodec() {} Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input, diff --git a/cpp/src/arrow/util/compression_zlib.h b/cpp/src/arrow/util/compression_zlib.h index 05cee0643e1..17291585c5b 100644 --- a/cpp/src/arrow/util/compression_zlib.h +++ b/cpp/src/arrow/util/compression_zlib.h @@ -42,7 +42,6 @@ class ARROW_EXPORT GZipCodec : public Codec { explicit GZipCodec(int compression_level = kGZipDefaultCompressionLevel, Format format = GZIP); - explicit GZipCodec(Format format = GZIP); ~GZipCodec() override; Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len,