Skip to content
Closed
30 changes: 30 additions & 0 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
namespace arrow {
namespace util {

namespace {

Status CheckSupportsCompressionLevel(Compression::type type) {
if (!Codec::SupportsCompressionLevel(type)) {
return Status::Invalid(
"The specified codec does not support the compression level parameter");
}
return Status::OK();
}

} // namespace

int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; }

Status Codec::Init() { return Status::OK(); }
Expand Down Expand Up @@ -103,6 +115,24 @@ bool Codec::SupportsCompressionLevel(Compression::type codec) {
}
}

Result<int> Codec::MaximumCompressionLevel(Compression::type codec_type) {
RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type));
ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type));
return codec->maximum_compression_level();
}

Result<int> Codec::MinimumCompressionLevel(Compression::type codec_type) {
RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type));
ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type));
return codec->minimum_compression_level();
}

Result<int> Codec::DefaultCompressionLevel(Compression::type codec_type) {
RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type));
ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type));
return codec->default_compression_level();
}

Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
if (!IsAvailable(codec_type)) {
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ class ARROW_EXPORT Codec {
/// \brief Return true if indicated codec supports setting a compression level
static bool SupportsCompressionLevel(Compression::type codec);

/// \brief Return the smallest supported compression level for the codec
/// Note: This function creates a temporary Codec instance
static Result<int> MinimumCompressionLevel(Compression::type codec);

/// \brief Return the largest supported compression level for the codec
/// Note: This function creates a temporary Codec instance
static Result<int> MaximumCompressionLevel(Compression::type codec);

/// \brief Return the default compression level
/// Note: This function creates a temporary Codec instance
static Result<int> DefaultCompressionLevel(Compression::type codec);

/// \brief Return the smallest supported compression level
virtual int minimum_compression_level() const = 0;

/// \brief Return the largest supported compression level
virtual int maximum_compression_level() const = 0;

/// \brief Return the default compression level
virtual int default_compression_level() const = 0;

/// \brief One-shot decompression function
///
/// output_buffer_len must be correct and therefore be obtained in advance.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ class BrotliCodec : public Codec {
Compression::type compression_type() const override { return Compression::BROTLI; }

int compression_level() const override { return compression_level_; }
int minimum_compression_level() const override { return BROTLI_MIN_QUALITY; }
int maximum_compression_level() const override { return BROTLI_MAX_QUALITY; }
int default_compression_level() const override {
return kBrotliDefaultCompressionLevel;
}

private:
const int compression_level_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/compression_bz2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ namespace internal {

namespace {

constexpr int kBZ2MinCompressionLevel = 1;
constexpr int kBZ2MaxCompressionLevel = 9;

// Max number of bytes the bz2 APIs accept at a time
constexpr auto kSizeLimit =
static_cast<int64_t>(std::numeric_limits<unsigned int>::max());
Expand Down Expand Up @@ -265,6 +268,9 @@ class BZ2Codec : public Codec {
Compression::type compression_type() const override { return Compression::BZ2; }

int compression_level() const override { return compression_level_; }
int minimum_compression_level() const override { return kBZ2MinCompressionLevel; }
int maximum_compression_level() const override { return kBZ2MaxCompressionLevel; }
int default_compression_level() const override { return kBZ2DefaultCompressionLevel; }

private:
int compression_level_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ class Lz4FrameCodec : public Codec {
}

Compression::type compression_type() const override { return Compression::LZ4_FRAME; }
int minimum_compression_level() const override { return kUseDefaultCompressionLevel; }
int maximum_compression_level() const override { return kUseDefaultCompressionLevel; }
int default_compression_level() const override { return kUseDefaultCompressionLevel; }

protected:
const LZ4F_preferences_t prefs_;
Expand Down Expand Up @@ -350,6 +353,9 @@ class Lz4Codec : public Codec {
}

Compression::type compression_type() const override { return Compression::LZ4; }
int minimum_compression_level() const override { return kUseDefaultCompressionLevel; }
int maximum_compression_level() const override { return kUseDefaultCompressionLevel; }
int default_compression_level() const override { return kUseDefaultCompressionLevel; }
};

// ----------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/compression_snappy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class SnappyCodec : public Codec {
}

Compression::type compression_type() const override { return Compression::SNAPPY; }
int minimum_compression_level() const override { return kUseDefaultCompressionLevel; }
int maximum_compression_level() const override { return kUseDefaultCompressionLevel; }
int default_compression_level() const override { return kUseDefaultCompressionLevel; }
};

} // namespace
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,32 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
}
}

TEST_P(CodecTest, MinMaxCompressionLevel) {
auto type = GetCompression();
ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(type));

if (Codec::SupportsCompressionLevel(type)) {
ASSERT_OK_AND_ASSIGN(auto min_level, Codec::MinimumCompressionLevel(type));
ASSERT_OK_AND_ASSIGN(auto max_level, Codec::MaximumCompressionLevel(type));
ASSERT_OK_AND_ASSIGN(auto default_level, Codec::DefaultCompressionLevel(type));
ASSERT_NE(min_level, Codec::UseDefaultCompressionLevel());
ASSERT_NE(max_level, Codec::UseDefaultCompressionLevel());
ASSERT_NE(default_level, Codec::UseDefaultCompressionLevel());
ASSERT_LT(min_level, max_level);
ASSERT_EQ(min_level, codec->minimum_compression_level());
ASSERT_EQ(max_level, codec->maximum_compression_level());
ASSERT_GE(default_level, min_level);
ASSERT_LE(default_level, max_level);
} else {
ASSERT_RAISES(Invalid, Codec::MinimumCompressionLevel(type));
ASSERT_RAISES(Invalid, Codec::MaximumCompressionLevel(type));
ASSERT_RAISES(Invalid, Codec::DefaultCompressionLevel(type));
ASSERT_EQ(codec->minimum_compression_level(), Codec::UseDefaultCompressionLevel());
ASSERT_EQ(codec->maximum_compression_level(), Codec::UseDefaultCompressionLevel());
ASSERT_EQ(codec->default_compression_level(), Codec::UseDefaultCompressionLevel());
}
}

TEST_P(CodecTest, OutputBufferIsSmall) {
auto type = GetCompression();
if (type != Compression::SNAPPY) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/compression_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ constexpr int GZIP_CODEC = 16;
// Determine if this is libz or gzip from header.
constexpr int DETECT_CODEC = 32;

constexpr int kGZipMinCompressionLevel = 1;
constexpr int kGZipMaxCompressionLevel = 9;

int CompressionWindowBitsForFormat(GZipFormat::type format) {
int window_bits = WINDOW_BITS;
switch (format) {
Expand Down Expand Up @@ -468,6 +471,9 @@ class GZipCodec : public Codec {
Compression::type compression_type() const override { return Compression::GZIP; }

int compression_level() const override { return compression_level_; }
int minimum_compression_level() const override { return kGZipMinCompressionLevel; }
int maximum_compression_level() const override { return kGZipMaxCompressionLevel; }
int default_compression_level() const override { return kGZipDefaultCompressionLevel; }

private:
// zlib is stateful and the z_stream state variable must be initialized
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/compression_zstd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ class ZSTDCodec : public Codec {
}

Compression::type compression_type() const override { return Compression::ZSTD; }
int minimum_compression_level() const override { return ZSTD_minCLevel(); }
int maximum_compression_level() const override { return ZSTD_maxCLevel(); }
int default_compression_level() const override { return kZSTDDefaultCompressionLevel; }

int compression_level() const override { return compression_level_; }

Expand Down
1 change: 1 addition & 0 deletions docs/source/python/api/memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Miscellaneous
.. autosummary::
:toctree: ../generated/

Codec
compress
decompress

Expand Down
18 changes: 18 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,23 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
@staticmethod
CResult[unique_ptr[CCodec]] Create(CCompressionType codec)

@staticmethod
CResult[unique_ptr[CCodec]] CreateWithLevel" Create"(
CCompressionType codec,
int compression_level)

@staticmethod
c_bool SupportsCompressionLevel(CCompressionType codec)

@staticmethod
CResult[int] MinimumCompressionLevel(CCompressionType codec)

@staticmethod
CResult[int] MaximumCompressionLevel(CCompressionType codec)

@staticmethod
CResult[int] DefaultCompressionLevel(CCompressionType codec)

@staticmethod
c_bool IsAvailable(CCompressionType codec)

Expand All @@ -2362,6 +2379,7 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
int64_t output_buffer_len,
uint8_t* output_buffer)
c_string name() const
int compression_level() const
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)


Expand Down
83 changes: 81 additions & 2 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1625,16 +1625,55 @@ cdef class Codec(_Weakrefable):
Type of compression codec to initialize, valid values are: 'gzip',
'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and
'snappy'.
compression_level: int, None
Optional parameter specifying how aggressively to compress. The
possible ranges and effect of this parameter depend on the specific
codec chosen. Higher values compress more but typically use more
resources (CPU/RAM). Some codecs support negative values.

gzip
The compression_level maps to the memlevel parameter of
deflateInit2. Higher levels use more RAM but are faster
and should have higher compression ratios.

bz2
The compression level maps to the blockSize100k parameter of
the BZ2_bzCompressInit function. Higher levels use more RAM
but are faster and should have higher compression ratios.

brotli
The compression level maps to the BROTLI_PARAM_QUALITY
parameter. Higher values are slower and should have higher
compression ratios.

lz4/lz4_frame/lz4_raw
The compression level parameter is not supported and must
be None

zstd
The compression level maps to the compressionLevel parameter
of ZSTD_initCStream. Negative values are supported. Higher
values are slower and should have higher compression ratios.

snappy
The compression level parameter is not supported and must
be None


Raises
------
ValueError
If invalid compression value is passed.
"""

def __init__(self, str compression not None):
def __init__(self, str compression not None, compression_level=None):
cdef CCompressionType typ = _ensure_compression(compression)
self.wrapped = move(GetResultValue(CCodec.Create(typ)))
if compression_level is not None:
self.wrapped = shared_ptr[CCodec](move(GetResultValue(
CCodec.CreateWithLevel(typ, compression_level))))
else:
self.wrapped = shared_ptr[CCodec](move(GetResultValue(
CCodec.Create(typ))))

cdef inline CCodec* unwrap(self) nogil:
return self.wrapped.get()
Expand Down Expand Up @@ -1680,10 +1719,50 @@ cdef class Codec(_Weakrefable):
cdef CCompressionType typ = _ensure_compression(compression)
return CCodec.IsAvailable(typ)

@staticmethod
def supports_compression_level(str compression not None):
"""
Returns true if the compression level parameter is supported
for the given codec.
"""
cdef CCompressionType typ = _ensure_compression(compression)
return CCodec.SupportsCompressionLevel(typ)

@staticmethod
def default_compression_level(str compression not None):
"""
Returns the compression level that Arrow will use for the codec if
None is specified.
"""
cdef CCompressionType typ = _ensure_compression(compression)
return GetResultValue(CCodec.DefaultCompressionLevel(typ))

@staticmethod
def minimum_compression_level(str compression not None):
"""
Returns the smallest valid value for the compression level
"""
cdef CCompressionType typ = _ensure_compression(compression)
return GetResultValue(CCodec.MinimumCompressionLevel(typ))

@staticmethod
def maximum_compression_level(str compression not None):
"""
Returns the largest valid value for the compression level
"""
cdef CCompressionType typ = _ensure_compression(compression)
return GetResultValue(CCodec.MaximumCompressionLevel(typ))

@property
def name(self):
"""Returns the name of the codec"""
return frombytes(self.unwrap().name())

@property
def compression_level(self):
"""Returns the compression level parameter of the codec"""
return frombytes(self.unwrap().compression_level())

def compress(self, object buf, asbytes=False, memory_pool=None):
"""
Compress data from buffer-like object.
Expand Down
15 changes: 11 additions & 4 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ cdef class IpcWriteOptions(_Weakrefable):
If true, allow field lengths that don't fit in a signed 32-bit int.
use_legacy_format : bool, default False
Whether to use the pre-Arrow 0.15 IPC format.
compression: str or None
If not None, compression codec to use for record batch buffers.
May only be "lz4", "zstd" or None.
compression: str, Codec, or None
compression codec to use for record batch buffers.
If None then batch buffers will be uncompressed.
Must be "lz4", "zstd" or None.
To specify a compression_level use `pyarrow.Codec`
use_threads: bool
Whether to use the global CPU thread pool to parallelize any
computational tasks like compression.
Expand Down Expand Up @@ -158,9 +160,14 @@ cdef class IpcWriteOptions(_Weakrefable):
def compression(self, value):
if value is None:
self.c_options.codec.reset()
else:
elif isinstance(value, str):
self.c_options.codec = shared_ptr[CCodec](GetResultValue(
CCodec.Create(_ensure_compression(value))).release())
elif isinstance(value, Codec):
self.c_options.codec = (<Codec>value).wrapped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit, but we could use .unwrap() here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There already was an unwrap() but it returned CCodec* and not shared_ptr<CCoded>. I think I'd need to investigate where the current unwrap() was used.

else:
raise TypeError(
"Property `compression` must be None, str, or pyarrow.Codec")

@property
def use_threads(self):
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ cdef class RecordBatchReader(_Weakrefable):

cdef class Codec(_Weakrefable):
cdef:
unique_ptr[CCodec] wrapped
shared_ptr[CCodec] wrapped

cdef inline CCodec* unwrap(self) nogil

Expand Down
Loading