diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index f9c084f6c26..8db199b4e76 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -29,6 +29,18 @@ namespace arrow { namespace util { +namespace { + +Status CheckSupportsCompressionLevel(Compression::type type) { + if (!Codec::SupportsCompressionLevel(type)) { + return Status::Invalid( + "The specified codec does not support the compression level parameter"); + } + return Status::OK(); +} + +} // namespace + int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } @@ -103,6 +115,24 @@ bool Codec::SupportsCompressionLevel(Compression::type codec) { } } +Result Codec::MaximumCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->maximum_compression_level(); +} + +Result Codec::MinimumCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->minimum_compression_level(); +} + +Result Codec::DefaultCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->default_compression_level(); +} + Result> Codec::Create(Compression::type codec_type, int compression_level) { if (!IsAvailable(codec_type)) { diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 6c9a74c6d21..0832e82a606 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -132,6 +132,27 @@ class ARROW_EXPORT Codec { /// \brief Return true if indicated codec supports setting a compression level static bool SupportsCompressionLevel(Compression::type codec); + /// \brief Return the smallest supported compression level for the codec + /// Note: This function creates a temporary Codec instance + static Result MinimumCompressionLevel(Compression::type codec); + + /// \brief Return the largest supported compression level for the codec + /// Note: This function creates a temporary Codec instance + static Result MaximumCompressionLevel(Compression::type codec); + + /// \brief Return the default compression level + /// Note: This function creates a temporary Codec instance + static Result DefaultCompressionLevel(Compression::type codec); + + /// \brief Return the smallest supported compression level + virtual int minimum_compression_level() const = 0; + + /// \brief Return the largest supported compression level + virtual int maximum_compression_level() const = 0; + + /// \brief Return the default compression level + virtual int default_compression_level() const = 0; + /// \brief One-shot decompression function /// /// output_buffer_len must be correct and therefore be obtained in advance. diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 4feabe23345..cb547c2c8cf 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -224,6 +224,11 @@ class BrotliCodec : public Codec { Compression::type compression_type() const override { return Compression::BROTLI; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return BROTLI_MIN_QUALITY; } + int maximum_compression_level() const override { return BROTLI_MAX_QUALITY; } + int default_compression_level() const override { + return kBrotliDefaultCompressionLevel; + } private: const int compression_level_; diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 8a8c1cb7a45..b367f2ff20c 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -40,6 +40,9 @@ namespace internal { namespace { +constexpr int kBZ2MinCompressionLevel = 1; +constexpr int kBZ2MaxCompressionLevel = 9; + // Max number of bytes the bz2 APIs accept at a time constexpr auto kSizeLimit = static_cast(std::numeric_limits::max()); @@ -265,6 +268,9 @@ class BZ2Codec : public Codec { Compression::type compression_type() const override { return Compression::BZ2; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return kBZ2MinCompressionLevel; } + int maximum_compression_level() const override { return kBZ2MaxCompressionLevel; } + int default_compression_level() const override { return kBZ2DefaultCompressionLevel; } private: int compression_level_; diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 9314dfd7faf..c783e405590 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -300,6 +300,9 @@ class Lz4FrameCodec : public Codec { } Compression::type compression_type() const override { return Compression::LZ4_FRAME; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } protected: const LZ4F_preferences_t prefs_; @@ -350,6 +353,9 @@ class Lz4Codec : public Codec { } Compression::type compression_type() const override { return Compression::LZ4; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } }; // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 9b016874b56..3756f957d04 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -86,6 +86,9 @@ class SnappyCodec : public Codec { } Compression::type compression_type() const override { return Compression::SNAPPY; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } }; } // namespace diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 2bd7a176234..795d5e31d65 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -399,6 +399,32 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) { } } +TEST_P(CodecTest, MinMaxCompressionLevel) { + auto type = GetCompression(); + ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(type)); + + if (Codec::SupportsCompressionLevel(type)) { + ASSERT_OK_AND_ASSIGN(auto min_level, Codec::MinimumCompressionLevel(type)); + ASSERT_OK_AND_ASSIGN(auto max_level, Codec::MaximumCompressionLevel(type)); + ASSERT_OK_AND_ASSIGN(auto default_level, Codec::DefaultCompressionLevel(type)); + ASSERT_NE(min_level, Codec::UseDefaultCompressionLevel()); + ASSERT_NE(max_level, Codec::UseDefaultCompressionLevel()); + ASSERT_NE(default_level, Codec::UseDefaultCompressionLevel()); + ASSERT_LT(min_level, max_level); + ASSERT_EQ(min_level, codec->minimum_compression_level()); + ASSERT_EQ(max_level, codec->maximum_compression_level()); + ASSERT_GE(default_level, min_level); + ASSERT_LE(default_level, max_level); + } else { + ASSERT_RAISES(Invalid, Codec::MinimumCompressionLevel(type)); + ASSERT_RAISES(Invalid, Codec::MaximumCompressionLevel(type)); + ASSERT_RAISES(Invalid, Codec::DefaultCompressionLevel(type)); + ASSERT_EQ(codec->minimum_compression_level(), Codec::UseDefaultCompressionLevel()); + ASSERT_EQ(codec->maximum_compression_level(), Codec::UseDefaultCompressionLevel()); + ASSERT_EQ(codec->default_compression_level(), Codec::UseDefaultCompressionLevel()); + } +} + TEST_P(CodecTest, OutputBufferIsSmall) { auto type = GetCompression(); if (type != Compression::SNAPPY) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 520e9dcd383..e9cb2470ee2 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -52,6 +52,9 @@ constexpr int GZIP_CODEC = 16; // Determine if this is libz or gzip from header. constexpr int DETECT_CODEC = 32; +constexpr int kGZipMinCompressionLevel = 1; +constexpr int kGZipMaxCompressionLevel = 9; + int CompressionWindowBitsForFormat(GZipFormat::type format) { int window_bits = WINDOW_BITS; switch (format) { @@ -468,6 +471,9 @@ class GZipCodec : public Codec { Compression::type compression_type() const override { return Compression::GZIP; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return kGZipMinCompressionLevel; } + int maximum_compression_level() const override { return kGZipMaxCompressionLevel; } + int default_compression_level() const override { return kGZipDefaultCompressionLevel; } private: // zlib is stateful and the z_stream state variable must be initialized diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 382e0573b29..e15ecb4e1fe 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -228,6 +228,9 @@ class ZSTDCodec : public Codec { } Compression::type compression_type() const override { return Compression::ZSTD; } + int minimum_compression_level() const override { return ZSTD_minCLevel(); } + int maximum_compression_level() const override { return ZSTD_maxCLevel(); } + int default_compression_level() const override { return kZSTDDefaultCompressionLevel; } int compression_level() const override { return compression_level_; } diff --git a/docs/source/python/api/memory.rst b/docs/source/python/api/memory.rst index b7384748076..f4382ba23c9 100644 --- a/docs/source/python/api/memory.rst +++ b/docs/source/python/api/memory.rst @@ -50,6 +50,7 @@ Miscellaneous .. autosummary:: :toctree: ../generated/ + Codec compress decompress diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index bd3bdb251f3..171b3ede217 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2352,6 +2352,23 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: @staticmethod CResult[unique_ptr[CCodec]] Create(CCompressionType codec) + @staticmethod + CResult[unique_ptr[CCodec]] CreateWithLevel" Create"( + CCompressionType codec, + int compression_level) + + @staticmethod + c_bool SupportsCompressionLevel(CCompressionType codec) + + @staticmethod + CResult[int] MinimumCompressionLevel(CCompressionType codec) + + @staticmethod + CResult[int] MaximumCompressionLevel(CCompressionType codec) + + @staticmethod + CResult[int] DefaultCompressionLevel(CCompressionType codec) + @staticmethod c_bool IsAvailable(CCompressionType codec) @@ -2362,6 +2379,7 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: int64_t output_buffer_len, uint8_t* output_buffer) c_string name() const + int compression_level() const int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index b5da607950b..7d7cb1afb00 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1625,6 +1625,40 @@ cdef class Codec(_Weakrefable): Type of compression codec to initialize, valid values are: 'gzip', 'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and 'snappy'. + compression_level: int, None + Optional parameter specifying how aggressively to compress. The + possible ranges and effect of this parameter depend on the specific + codec chosen. Higher values compress more but typically use more + resources (CPU/RAM). Some codecs support negative values. + + gzip + The compression_level maps to the memlevel parameter of + deflateInit2. Higher levels use more RAM but are faster + and should have higher compression ratios. + + bz2 + The compression level maps to the blockSize100k parameter of + the BZ2_bzCompressInit function. Higher levels use more RAM + but are faster and should have higher compression ratios. + + brotli + The compression level maps to the BROTLI_PARAM_QUALITY + parameter. Higher values are slower and should have higher + compression ratios. + + lz4/lz4_frame/lz4_raw + The compression level parameter is not supported and must + be None + + zstd + The compression level maps to the compressionLevel parameter + of ZSTD_initCStream. Negative values are supported. Higher + values are slower and should have higher compression ratios. + + snappy + The compression level parameter is not supported and must + be None + Raises ------ @@ -1632,9 +1666,14 @@ cdef class Codec(_Weakrefable): If invalid compression value is passed. """ - def __init__(self, str compression not None): + def __init__(self, str compression not None, compression_level=None): cdef CCompressionType typ = _ensure_compression(compression) - self.wrapped = move(GetResultValue(CCodec.Create(typ))) + if compression_level is not None: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.CreateWithLevel(typ, compression_level)))) + else: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.Create(typ)))) cdef inline CCodec* unwrap(self) nogil: return self.wrapped.get() @@ -1680,10 +1719,50 @@ cdef class Codec(_Weakrefable): cdef CCompressionType typ = _ensure_compression(compression) return CCodec.IsAvailable(typ) + @staticmethod + def supports_compression_level(str compression not None): + """ + Returns true if the compression level parameter is supported + for the given codec. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return CCodec.SupportsCompressionLevel(typ) + + @staticmethod + def default_compression_level(str compression not None): + """ + Returns the compression level that Arrow will use for the codec if + None is specified. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.DefaultCompressionLevel(typ)) + + @staticmethod + def minimum_compression_level(str compression not None): + """ + Returns the smallest valid value for the compression level + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MinimumCompressionLevel(typ)) + + @staticmethod + def maximum_compression_level(str compression not None): + """ + Returns the largest valid value for the compression level + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MaximumCompressionLevel(typ)) + @property def name(self): + """Returns the name of the codec""" return frombytes(self.unwrap().name()) + @property + def compression_level(self): + """Returns the compression level parameter of the codec""" + return frombytes(self.unwrap().compression_level()) + def compress(self, object buf, asbytes=False, memory_pool=None): """ Compress data from buffer-like object. diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 93dd2eaef5e..4b22acc076f 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -96,9 +96,11 @@ cdef class IpcWriteOptions(_Weakrefable): If true, allow field lengths that don't fit in a signed 32-bit int. use_legacy_format : bool, default False Whether to use the pre-Arrow 0.15 IPC format. - compression: str or None - If not None, compression codec to use for record batch buffers. - May only be "lz4", "zstd" or None. + compression: str, Codec, or None + compression codec to use for record batch buffers. + If None then batch buffers will be uncompressed. + Must be "lz4", "zstd" or None. + To specify a compression_level use `pyarrow.Codec` use_threads: bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. @@ -158,9 +160,14 @@ cdef class IpcWriteOptions(_Weakrefable): def compression(self, value): if value is None: self.c_options.codec.reset() - else: + elif isinstance(value, str): self.c_options.codec = shared_ptr[CCodec](GetResultValue( CCodec.Create(_ensure_compression(value))).release()) + elif isinstance(value, Codec): + self.c_options.codec = (value).wrapped + else: + raise TypeError( + "Property `compression` must be None, str, or pyarrow.Codec") @property def use_threads(self): diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1959519c49d..414c7b5f26b 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -499,7 +499,7 @@ cdef class RecordBatchReader(_Weakrefable): cdef class Codec(_Weakrefable): cdef: - unique_ptr[CCodec] wrapped + shared_ptr[CCodec] wrapped cdef inline CCodec* unwrap(self) nogil diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 3de07c4305f..8fb98d4a6e7 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -24,6 +24,7 @@ import hypothesis as h from pyarrow.util import find_free_port +from pyarrow import Codec # setup hypothesis profiles @@ -44,13 +45,17 @@ groups = [ + 'brotli', + 'bz2', 'cython', 'dataset', 'hypothesis', 'fastparquet', 'gandiva', + 'gzip', 'hdfs', 'large_memory', + 'lz4', 'memory_leak', 'nopandas', 'orc', @@ -58,20 +63,26 @@ 'parquet', 'plasma', 's3', + 'snappy', 'tensorflow', 'flight', 'slow', 'requires_testing_data', + 'zstd', ] defaults = { + 'brotli': Codec.is_available('brotli'), + 'bz2': Codec.is_available('bz2'), 'cython': False, 'dataset': False, 'fastparquet': False, 'hypothesis': False, 'gandiva': False, + 'gzip': Codec.is_available('gzip'), 'hdfs': False, 'large_memory': False, + 'lz4': Codec.is_available('lz4'), 'memory_leak': False, 'orc': False, 'nopandas': False, @@ -79,10 +90,12 @@ 'parquet': False, 'plasma': False, 's3': False, + 'snappy': Codec.is_available('snappy'), 'tensorflow': False, 'flight': False, 'slow': False, 'requires_testing_data': True, + 'zstd': Codec.is_available('zstd'), } try: diff --git a/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/python/pyarrow/tests/parquet/test_compliant_nested_type.py index 804f3738f12..5b10ed4c2f0 100644 --- a/python/pyarrow/tests/parquet/test_compliant_nested_type.py +++ b/python/pyarrow/tests/parquet/test_compliant_nested_type.py @@ -35,6 +35,8 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet + # Tests for ARROW-11497 _test_data_simple = [ {'items': [1, 2]}, diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index dc9a3bb5274..43175b72ab7 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -36,6 +36,8 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet + @pytest.mark.pandas def test_pass_separate_metadata(): diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index ec1d5256bfd..4218e83cead 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -36,6 +36,8 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet + @pytest.mark.pandas @parametrize_legacy_dataset diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index f01ac292ddf..3d0451ee33e 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -55,6 +55,9 @@ def version(request): @pytest.fixture(scope="module", params=[None, "uncompressed", "lz4", "zstd"]) def compression(request): + if request.param in ['lz4', 'zstd'] and not pa.Codec.is_available( + request.param): + pytest.skip(f'{request.param} is not available') yield request.param @@ -599,6 +602,9 @@ def test_v2_set_chunksize(): @pytest.mark.pandas +@pytest.mark.lz4 +@pytest.mark.snappy +@pytest.mark.zstd def test_v2_compression_options(): df = pd.DataFrame({'A': np.arange(1000)}) @@ -776,6 +782,7 @@ def test_roundtrip(table, compression): _check_arrow_roundtrip(table, compression=compression) +@pytest.mark.lz4 def test_feather_v017_experimental_compression_backward_compatibility(datadir): # ARROW-11163 - ensure newer pyarrow versions can read the old feather # files from version 0.17.0 with experimental compression support (before diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 8faddc7b9e4..0e049e21778 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -855,6 +855,7 @@ def identity(v): return v +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'compressor'), [ @@ -892,6 +893,7 @@ def test_open_input_file(fs, pathfn): assert result == data[read_from:] +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'decompressor'), [ @@ -913,6 +915,7 @@ def test_open_output_stream(fs, pathfn, compression, buffer_size, assert f.read(len(data)) == data +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'compressor', 'decompressor'), [ diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index a085312bbc7..5119e162595 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -624,6 +624,89 @@ def test_compress_decompress(compression): pa.decompress(compressed_bytes, codec=compression) +@pytest.mark.parametrize("compression", [ + pytest.param( + "bz2", marks=pytest.mark.xfail(raises=pa.lib.ArrowNotImplementedError) + ), + "brotli", + "gzip", + "lz4", + "zstd", + "snappy" +]) +def test_compression_level(compression): + if not Codec.is_available(compression): + pytest.skip("{} support is not built".format(compression)) + + # These codecs do not support a compression level + no_level = ['snappy', 'lz4'] + if compression in no_level: + assert not Codec.supports_compression_level(compression) + with pytest.raises(ValueError): + Codec(compression, 0) + with pytest.raises(ValueError): + Codec.minimum_compression_level(compression) + with pytest.raises(ValueError): + Codec.maximum_compression_level(compression) + with pytest.raises(ValueError): + Codec.default_compression_level(compression) + return + + INPUT_SIZE = 10000 + test_data = (np.random.randint(0, 255, size=INPUT_SIZE) + .astype(np.uint8) + .tobytes()) + test_buf = pa.py_buffer(test_data) + + min_level = Codec.minimum_compression_level(compression) + max_level = Codec.maximum_compression_level(compression) + default_level = Codec.default_compression_level(compression) + + assert min_level < max_level + assert default_level >= min_level + assert default_level <= max_level + + for compression_level in range(min_level, max_level+1): + codec = Codec(compression, compression_level) + compressed_buf = codec.compress(test_buf) + compressed_bytes = codec.compress(test_data, asbytes=True) + assert isinstance(compressed_bytes, bytes) + decompressed_buf = codec.decompress(compressed_buf, INPUT_SIZE) + decompressed_bytes = codec.decompress(compressed_bytes, INPUT_SIZE, + asbytes=True) + + assert isinstance(decompressed_bytes, bytes) + + assert decompressed_buf.equals(test_buf) + assert decompressed_bytes == test_data + + with pytest.raises(ValueError): + codec.decompress(compressed_bytes) + + # The ability to set a seed this way is not present on older versions of + # numpy (currently in our python 3.6 CI build). Some inputs might just + # happen to compress the same between the two levels so using seeded + # random numbers is neccesary to help get more reliable results + # + # The goal of this part is to ensure the compression_level is being + # passed down to the C++ layer, not to verify the compression algs + # themselves + if not hasattr(np.random, 'default_rng'): + pytest.skip('Requires newer version of numpy') + rng = np.random.default_rng(seed=42) + values = rng.integers(0, 100, 1000) + arr = pa.array(values) + hard_to_compress_buffer = arr.buffers()[1] + + weak_codec = Codec(compression, min_level) + weakly_compressed_buf = weak_codec.compress(hard_to_compress_buffer) + + strong_codec = Codec(compression, max_level) + strongly_compressed_buf = strong_codec.compress(hard_to_compress_buffer) + + assert len(weakly_compressed_buf) > len(strongly_compressed_buf) + + def test_buffer_memoryview_is_immutable(): val = b'some data' @@ -1183,6 +1266,7 @@ def check_compressed_input(data, fn, compression): assert buf.to_pybytes() == data +@pytest.mark.gzip def test_compressed_input_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_input_test.gz") @@ -1209,6 +1293,7 @@ def check_compressed_concatenated(data, fn, compression): assert got == data +@pytest.mark.gzip def test_compressed_concatenated_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_input_test2.gz") @@ -1219,6 +1304,7 @@ def test_compressed_concatenated_gzip(tmpdir): check_compressed_concatenated(data, fn, "gzip") +@pytest.mark.gzip def test_compressed_input_invalid(): data = b"foo" * 10 raw = pa.BufferReader(data) @@ -1246,6 +1332,7 @@ def make_compressed_output(data, fn, compression): f.write(raw.getvalue()) +@pytest.mark.gzip def test_compressed_output_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_output_test.gz") @@ -1433,6 +1520,7 @@ def test_transcoding_decoding_error(src_encoding, dest_encoding): # ---------------------------------------------------------------------- # High-level API +@pytest.mark.gzip def test_input_stream_buffer(): data = b"some test data\n" * 10 + b"eof\n" for arg in [pa.py_buffer(data), memoryview(data)]: @@ -1478,6 +1566,7 @@ def test_input_stream_file_path(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_file_path_compressed(tmpdir): data = b"some test data\n" * 10 + b"eof\n" gz_data = gzip.compress(data) @@ -1524,6 +1613,7 @@ def test_input_stream_file_path_buffered(tmpdir): pa.input_stream(file_path, buffer_size='million') +@pytest.mark.gzip def test_input_stream_file_path_compressed_and_buffered(tmpdir): data = b"some test data\n" * 100 + b"eof\n" gz_data = gzip.compress(data) @@ -1539,6 +1629,7 @@ def test_input_stream_file_path_compressed_and_buffered(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n" bio = BytesIO(data) @@ -1562,6 +1653,7 @@ def test_input_stream_python_file(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_native_file(): data = b"some test data\n" * 10 + b"eof\n" gz_data = gzip.compress(data) @@ -1640,6 +1732,7 @@ def check_data(file_path, data): check_data(pathlib.Path(str(file_path)), data) +@pytest.mark.gzip def test_output_stream_file_path_compressed(tmpdir): data = b"some test data\n" * 10 + b"eof\n" file_path = tmpdir / 'output_stream.gz' @@ -1690,6 +1783,7 @@ def check_data(file_path, data, **kwargs): assert result == data +@pytest.mark.gzip def test_output_stream_file_path_compressed_and_buffered(tmpdir): data = b"some test data\n" * 100 + b"eof\n" file_path = tmpdir / 'output_stream_compressed_and_buffered.gz' @@ -1729,6 +1823,7 @@ def check_data(file_path, data, **kwargs): assert check_data(file_path, data, buffer_size=1024) == data +@pytest.mark.gzip def test_output_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n" diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index a15960bce74..87944bcc066 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -329,6 +329,37 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): reader.read_next_batch() +@pytest.mark.zstd +def test_compression_roundtrip(): + sink = io.BytesIO() + values = np.random.randint(0, 10, 10000) + table = pa.Table.from_arrays([values], names=["values"]) + + options = pa.ipc.IpcWriteOptions(compression='zstd') + with pa.ipc.RecordBatchFileWriter( + sink, table.schema, options=options) as writer: + writer.write_table(table) + len1 = len(sink.getvalue()) + + sink2 = io.BytesIO() + codec = pa.Codec('zstd', compression_level=5) + options = pa.ipc.IpcWriteOptions(compression=codec) + with pa.ipc.RecordBatchFileWriter( + sink2, table.schema, options=options) as writer: + writer.write_table(table) + len2 = len(sink2.getvalue()) + + # In theory len2 should be less than len1 but for this test we just want + # to ensure compression_level is being correctly passed down to the C++ + # layer so we don't really care if it makes it worse or better + assert len2 != len1 + + t1 = pa.ipc.open_file(sink).read_all() + t2 = pa.ipc.open_file(sink2).read_all() + + assert t1 == t2 + + def test_write_options(): options = pa.ipc.IpcWriteOptions() assert options.allow_64bit is False @@ -349,28 +380,33 @@ def test_write_options(): assert options.compression is None for value in ['lz4', 'zstd']: - options.compression = value - assert options.compression == value - options.compression = value.upper() - assert options.compression == value + if pa.Codec.is_available(value): + options.compression = value + assert options.compression == value + options.compression = value.upper() + assert options.compression == value options.compression = None assert options.compression is None + with pytest.raises(TypeError): + options.compression = 0 + assert options.use_threads is True options.use_threads = False assert options.use_threads is False - options = pa.ipc.IpcWriteOptions( - metadata_version=pa.ipc.MetadataVersion.V4, - allow_64bit=True, - use_legacy_format=True, - compression='lz4', - use_threads=False) - assert options.metadata_version == pa.ipc.MetadataVersion.V4 - assert options.allow_64bit is True - assert options.use_legacy_format is True - assert options.compression == 'lz4' - assert options.use_threads is False + if pa.Codec.is_available('lz4'): + options = pa.ipc.IpcWriteOptions( + metadata_version=pa.ipc.MetadataVersion.V4, + allow_64bit=True, + use_legacy_format=True, + compression='lz4', + use_threads=False) + assert options.metadata_version == pa.ipc.MetadataVersion.V4 + assert options.allow_64bit is True + assert options.use_legacy_format is True + assert options.compression == 'lz4' + assert options.use_threads is False def test_write_options_legacy_exclusive(stream_fixture): @@ -564,6 +600,7 @@ def test_message_serialize_read_message(example_messages): pa.ipc.read_message(reader) +@pytest.mark.gzip def test_message_read_from_compressed(example_messages): # Part of ARROW-5910 _, messages = example_messages diff --git a/python/pyarrow/tests/test_tensor.py b/python/pyarrow/tests/test_tensor.py index 07493144919..aee46bc9369 100644 --- a/python/pyarrow/tests/test_tensor.py +++ b/python/pyarrow/tests/test_tensor.py @@ -108,6 +108,7 @@ def test_tensor_ipc_roundtrip(tmpdir): assert result.equals(tensor) +@pytest.mark.gzip def test_tensor_ipc_read_from_compressed(tempdir): # ARROW-5910 data = np.random.randn(10, 4)