From 1818ea41b2f5859d1cdb4ff1b505e6c7185e32ae Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 13 Jul 2021 12:49:43 -1000 Subject: [PATCH 1/9] ARROW-13091: Added compression_level to IpcWriteOptions --- python/pyarrow/includes/libarrow.pxd | 5 ++++ python/pyarrow/ipc.pxi | 35 +++++++++++++++++----- python/pyarrow/lib.pxd | 1 + python/pyarrow/tests/test_ipc.py | 44 ++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index bd3bdb251f3..70a8c269cac 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2351,6 +2351,10 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: cdef cppclass CCodec" arrow::util::Codec": @staticmethod CResult[unique_ptr[CCodec]] Create(CCompressionType codec) + @staticmethod + CResult[unique_ptr[CCodec]] CreateWithLevel" Create"( + CCompressionType codec, + int compression_level) @staticmethod c_bool IsAvailable(CCompressionType codec) @@ -2362,6 +2366,7 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: int64_t output_buffer_len, uint8_t* output_buffer) c_string name() const + int compression_level() const int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 93dd2eaef5e..13631ef25a4 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -96,9 +96,14 @@ cdef class IpcWriteOptions(_Weakrefable): If true, allow field lengths that don't fit in a signed 32-bit int. use_legacy_format : bool, default False Whether to use the pre-Arrow 0.15 IPC format. - compression: str or None - If not None, compression codec to use for record batch buffers. - May only be "lz4", "zstd" or None. + compression: str, or None + compression codec to use for record batch buffers. + If None then batch buffers will be uncompressed. + Must be "lz4", "zstd" or None. + compression_level: int, or None + The compression level to use. + Higher indicates more compression, details depend on the codec. + If None then a default level for the codec will be used. use_threads: bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. @@ -112,12 +117,13 @@ cdef class IpcWriteOptions(_Weakrefable): def __init__(self, *, metadata_version=MetadataVersion.V5, bint allow_64bit=False, use_legacy_format=False, - compression=None, bint use_threads=True, - bint emit_dictionary_deltas=False): + compression=None, compression_level=None, + bint use_threads=True, bint emit_dictionary_deltas=False): self.c_options = CIpcWriteOptions.Defaults() self.allow_64bit = allow_64bit self.use_legacy_format = use_legacy_format self.metadata_version = metadata_version + self._compression_level = compression_level if compression is not None: self.compression = compression self.use_threads = use_threads @@ -159,8 +165,23 @@ cdef class IpcWriteOptions(_Weakrefable): if value is None: self.c_options.codec.reset() else: - self.c_options.codec = shared_ptr[CCodec](GetResultValue( - CCodec.Create(_ensure_compression(value))).release()) + if self._compression_level is None: + self.c_options.codec = shared_ptr[CCodec](GetResultValue( + CCodec.Create(_ensure_compression(value))).release()) + else: + self.c_options.codec = shared_ptr[CCodec](GetResultValue( + CCodec.CreateWithLevel(_ensure_compression(value), + self._compression_level)).release()) + + @property + def compression_level(self): + return self._compression_level + + @compression_level.setter + def compression_level(self, value): + self._compression_level = value + # Recompute the codec + self.compression = self.compression @property def use_threads(self): diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1959519c49d..9608af2ac6a 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -38,6 +38,7 @@ cdef class _Weakrefable: cdef class IpcWriteOptions(_Weakrefable): cdef: CIpcWriteOptions c_options + object _compression_level cdef class Message(_Weakrefable): diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index a15960bce74..dda4f9b54e1 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -329,6 +329,36 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): reader.read_next_batch() +def test_compression_roundtrip(): + sink = io.BytesIO() + rng = np.random.default_rng(seed=42) + values = rng.integers(0, 10, 100000) + table = pa.Table.from_arrays([values], names=["values"]) + + options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=1) + writer = pa.ipc.RecordBatchFileWriter(sink, table.schema, options=options) + writer.write_table(table) + writer.close() + len1 = len(sink.getvalue()) + + sink2 = io.BytesIO() + options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=5) + writer = pa.ipc.RecordBatchFileWriter(sink2, table.schema, options=options) + writer.write_table(table) + writer.close() + len2 = len(sink2.getvalue()) + + # In theory len2 should be less than len1 but for this test we just want + # to ensure compression_level is being correctly passed down to the C++ + # layer so we don't really care if it makes it worse or better + assert len2 != len1 + + t1 = pa.ipc.open_file(sink).read_all() + t2 = pa.ipc.open_file(sink2).read_all() + + assert t1 == t2 + + def test_write_options(): options = pa.ipc.IpcWriteOptions() assert options.allow_64bit is False @@ -353,6 +383,20 @@ def test_write_options(): assert options.compression == value options.compression = value.upper() assert options.compression == value + + options.compression = 'zstd' + for compression_level in [-5, 0, 5]: + options.compression_level = compression_level + assert options.compression_level == compression_level + + # Cannot set compression_level with lz4 + with pytest.raises(pa.ArrowInvalid): + options.compression = 'lz4' + + options.compression_level = None + options.compression = 'lz4' + assert options.compression == 'lz4' + options.compression = None assert options.compression is None From 76ed1de4983947882ea29987d9b9464ddfaf6e7b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 13 Jul 2021 16:12:18 -1000 Subject: [PATCH 2/9] ARROW-13091: Lint --- python/pyarrow/includes/libarrow.pxd | 5 +++-- python/pyarrow/ipc.pxi | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 70a8c269cac..25fccf528ea 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2351,10 +2351,11 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: cdef cppclass CCodec" arrow::util::Codec": @staticmethod CResult[unique_ptr[CCodec]] Create(CCompressionType codec) + @staticmethod CResult[unique_ptr[CCodec]] CreateWithLevel" Create"( - CCompressionType codec, - int compression_level) + CCompressionType codec, + int compression_level) @staticmethod c_bool IsAvailable(CCompressionType codec) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 13631ef25a4..fd685b44fbb 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -171,7 +171,7 @@ cdef class IpcWriteOptions(_Weakrefable): else: self.c_options.codec = shared_ptr[CCodec](GetResultValue( CCodec.CreateWithLevel(_ensure_compression(value), - self._compression_level)).release()) + self._compression_level)).release()) @property def compression_level(self): From fa2f60bfdf308056b3a5a14fd23959ce486d61ea Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 13 Jul 2021 16:15:48 -1000 Subject: [PATCH 3/9] ARROW-13091: Skipping test if a new enough version of numpy is not present to generate stable random numbers --- python/pyarrow/tests/test_ipc.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index dda4f9b54e1..913a629f0ef 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -330,6 +330,12 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): def test_compression_roundtrip(): + # The ability to set a seed this way is not present on older versions of + # numpy (currently in our python 3.6 CI build). Some inputs might just + # happen to compress the same between the two levels so using seeded + # random numbers is neccesary + if not hasattr(np.random, 'default_rng'): + pytest.skip('Requires newer version of numpy') sink = io.BytesIO() rng = np.random.default_rng(seed=42) values = rng.integers(0, 10, 100000) From 40599c55629be860c50034a23756fa283dfd2fd8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 15 Jul 2021 11:21:27 -1000 Subject: [PATCH 4/9] ARROW-13091: Changed to using context managers for RecordBatchFileWriter in tests --- python/pyarrow/tests/test_ipc.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 913a629f0ef..741a7991b18 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -342,16 +342,16 @@ def test_compression_roundtrip(): table = pa.Table.from_arrays([values], names=["values"]) options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=1) - writer = pa.ipc.RecordBatchFileWriter(sink, table.schema, options=options) - writer.write_table(table) - writer.close() + with pa.ipc.RecordBatchFileWriter( + sink, table.schema, options=options) as writer: + writer.write_table(table) len1 = len(sink.getvalue()) sink2 = io.BytesIO() options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=5) - writer = pa.ipc.RecordBatchFileWriter(sink2, table.schema, options=options) - writer.write_table(table) - writer.close() + with pa.ipc.RecordBatchFileWriter( + sink2, table.schema, options=options) as writer: + writer.write_table(table) len2 = len(sink2.getvalue()) # In theory len2 should be less than len1 but for this test we just want From f4d7e58a3db955d121daec87f71877ea5c7e9abc Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 15 Jul 2021 17:47:12 -1000 Subject: [PATCH 5/9] ARROW-13091: Switched to using pyarrow.Codec instead of compression/compression_level pairs. Added documentation and methods around compression_level --- cpp/src/arrow/util/compression.cc | 30 +++++++++ cpp/src/arrow/util/compression.h | 21 ++++++ cpp/src/arrow/util/compression_brotli.cc | 5 ++ cpp/src/arrow/util/compression_bz2.cc | 6 ++ cpp/src/arrow/util/compression_lz4.cc | 6 ++ cpp/src/arrow/util/compression_snappy.cc | 3 + cpp/src/arrow/util/compression_test.cc | 26 ++++++++ cpp/src/arrow/util/compression_zlib.cc | 6 ++ cpp/src/arrow/util/compression_zstd.cc | 3 + docs/source/python/api/memory.rst | 1 + python/pyarrow/includes/libarrow.pxd | 9 +++ python/pyarrow/io.pxi | 83 +++++++++++++++++++++++- python/pyarrow/ipc.pxi | 35 +++------- python/pyarrow/lib.pxd | 3 +- python/pyarrow/tests/test_io.py | 83 ++++++++++++++++++++++++ python/pyarrow/tests/test_ipc.py | 28 ++------ 16 files changed, 295 insertions(+), 53 deletions(-) diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index f9c084f6c26..8db199b4e76 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -29,6 +29,18 @@ namespace arrow { namespace util { +namespace { + +Status CheckSupportsCompressionLevel(Compression::type type) { + if (!Codec::SupportsCompressionLevel(type)) { + return Status::Invalid( + "The specified codec does not support the compression level parameter"); + } + return Status::OK(); +} + +} // namespace + int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } Status Codec::Init() { return Status::OK(); } @@ -103,6 +115,24 @@ bool Codec::SupportsCompressionLevel(Compression::type codec) { } } +Result Codec::MaximumCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->maximum_compression_level(); +} + +Result Codec::MinimumCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->minimum_compression_level(); +} + +Result Codec::DefaultCompressionLevel(Compression::type codec_type) { + RETURN_NOT_OK(CheckSupportsCompressionLevel(codec_type)); + ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(codec_type)); + return codec->default_compression_level(); +} + Result> Codec::Create(Compression::type codec_type, int compression_level) { if (!IsAvailable(codec_type)) { diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 6c9a74c6d21..0832e82a606 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -132,6 +132,27 @@ class ARROW_EXPORT Codec { /// \brief Return true if indicated codec supports setting a compression level static bool SupportsCompressionLevel(Compression::type codec); + /// \brief Return the smallest supported compression level for the codec + /// Note: This function creates a temporary Codec instance + static Result MinimumCompressionLevel(Compression::type codec); + + /// \brief Return the largest supported compression level for the codec + /// Note: This function creates a temporary Codec instance + static Result MaximumCompressionLevel(Compression::type codec); + + /// \brief Return the default compression level + /// Note: This function creates a temporary Codec instance + static Result DefaultCompressionLevel(Compression::type codec); + + /// \brief Return the smallest supported compression level + virtual int minimum_compression_level() const = 0; + + /// \brief Return the largest supported compression level + virtual int maximum_compression_level() const = 0; + + /// \brief Return the default compression level + virtual int default_compression_level() const = 0; + /// \brief One-shot decompression function /// /// output_buffer_len must be correct and therefore be obtained in advance. diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 4feabe23345..cb547c2c8cf 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -224,6 +224,11 @@ class BrotliCodec : public Codec { Compression::type compression_type() const override { return Compression::BROTLI; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return BROTLI_MIN_QUALITY; } + int maximum_compression_level() const override { return BROTLI_MAX_QUALITY; } + int default_compression_level() const override { + return kBrotliDefaultCompressionLevel; + } private: const int compression_level_; diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 8a8c1cb7a45..b367f2ff20c 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -40,6 +40,9 @@ namespace internal { namespace { +constexpr int kBZ2MinCompressionLevel = 1; +constexpr int kBZ2MaxCompressionLevel = 9; + // Max number of bytes the bz2 APIs accept at a time constexpr auto kSizeLimit = static_cast(std::numeric_limits::max()); @@ -265,6 +268,9 @@ class BZ2Codec : public Codec { Compression::type compression_type() const override { return Compression::BZ2; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return kBZ2MinCompressionLevel; } + int maximum_compression_level() const override { return kBZ2MaxCompressionLevel; } + int default_compression_level() const override { return kBZ2DefaultCompressionLevel; } private: int compression_level_; diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 9314dfd7faf..c783e405590 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -300,6 +300,9 @@ class Lz4FrameCodec : public Codec { } Compression::type compression_type() const override { return Compression::LZ4_FRAME; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } protected: const LZ4F_preferences_t prefs_; @@ -350,6 +353,9 @@ class Lz4Codec : public Codec { } Compression::type compression_type() const override { return Compression::LZ4; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } }; // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 9b016874b56..3756f957d04 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -86,6 +86,9 @@ class SnappyCodec : public Codec { } Compression::type compression_type() const override { return Compression::SNAPPY; } + int minimum_compression_level() const override { return kUseDefaultCompressionLevel; } + int maximum_compression_level() const override { return kUseDefaultCompressionLevel; } + int default_compression_level() const override { return kUseDefaultCompressionLevel; } }; } // namespace diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 2bd7a176234..795d5e31d65 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -399,6 +399,32 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) { } } +TEST_P(CodecTest, MinMaxCompressionLevel) { + auto type = GetCompression(); + ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(type)); + + if (Codec::SupportsCompressionLevel(type)) { + ASSERT_OK_AND_ASSIGN(auto min_level, Codec::MinimumCompressionLevel(type)); + ASSERT_OK_AND_ASSIGN(auto max_level, Codec::MaximumCompressionLevel(type)); + ASSERT_OK_AND_ASSIGN(auto default_level, Codec::DefaultCompressionLevel(type)); + ASSERT_NE(min_level, Codec::UseDefaultCompressionLevel()); + ASSERT_NE(max_level, Codec::UseDefaultCompressionLevel()); + ASSERT_NE(default_level, Codec::UseDefaultCompressionLevel()); + ASSERT_LT(min_level, max_level); + ASSERT_EQ(min_level, codec->minimum_compression_level()); + ASSERT_EQ(max_level, codec->maximum_compression_level()); + ASSERT_GE(default_level, min_level); + ASSERT_LE(default_level, max_level); + } else { + ASSERT_RAISES(Invalid, Codec::MinimumCompressionLevel(type)); + ASSERT_RAISES(Invalid, Codec::MaximumCompressionLevel(type)); + ASSERT_RAISES(Invalid, Codec::DefaultCompressionLevel(type)); + ASSERT_EQ(codec->minimum_compression_level(), Codec::UseDefaultCompressionLevel()); + ASSERT_EQ(codec->maximum_compression_level(), Codec::UseDefaultCompressionLevel()); + ASSERT_EQ(codec->default_compression_level(), Codec::UseDefaultCompressionLevel()); + } +} + TEST_P(CodecTest, OutputBufferIsSmall) { auto type = GetCompression(); if (type != Compression::SNAPPY) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 520e9dcd383..e9cb2470ee2 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -52,6 +52,9 @@ constexpr int GZIP_CODEC = 16; // Determine if this is libz or gzip from header. constexpr int DETECT_CODEC = 32; +constexpr int kGZipMinCompressionLevel = 1; +constexpr int kGZipMaxCompressionLevel = 9; + int CompressionWindowBitsForFormat(GZipFormat::type format) { int window_bits = WINDOW_BITS; switch (format) { @@ -468,6 +471,9 @@ class GZipCodec : public Codec { Compression::type compression_type() const override { return Compression::GZIP; } int compression_level() const override { return compression_level_; } + int minimum_compression_level() const override { return kGZipMinCompressionLevel; } + int maximum_compression_level() const override { return kGZipMaxCompressionLevel; } + int default_compression_level() const override { return kGZipDefaultCompressionLevel; } private: // zlib is stateful and the z_stream state variable must be initialized diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 382e0573b29..e15ecb4e1fe 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -228,6 +228,9 @@ class ZSTDCodec : public Codec { } Compression::type compression_type() const override { return Compression::ZSTD; } + int minimum_compression_level() const override { return ZSTD_minCLevel(); } + int maximum_compression_level() const override { return ZSTD_maxCLevel(); } + int default_compression_level() const override { return kZSTDDefaultCompressionLevel; } int compression_level() const override { return compression_level_; } diff --git a/docs/source/python/api/memory.rst b/docs/source/python/api/memory.rst index b7384748076..f4382ba23c9 100644 --- a/docs/source/python/api/memory.rst +++ b/docs/source/python/api/memory.rst @@ -50,6 +50,7 @@ Miscellaneous .. autosummary:: :toctree: ../generated/ + Codec compress decompress diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 25fccf528ea..423b5ec10d3 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2357,6 +2357,15 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: CCompressionType codec, int compression_level) + @staticmethod + c_bool SupportsCompressionLevel(CCompressionType codec) + @staticmethod + CResult[int] MinimumCompressionLevel(CCompressionType codec) + @staticmethod + CResult[int] MaximumCompressionLevel(CCompressionType codec) + @staticmethod + CResult[int] DefaultCompressionLevel(CCompressionType codec) + @staticmethod c_bool IsAvailable(CCompressionType codec) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index b5da607950b..167aed1623f 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1625,6 +1625,40 @@ cdef class Codec(_Weakrefable): Type of compression codec to initialize, valid values are: 'gzip', 'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and 'snappy'. + compression_level: int, None + Optional parameter specifying how aggressively to compress. The + possible ranges and effect of this parameter depend on the specific + codec chosen. Higher values compress more but typically use more + resources (CPU/RAM). Some codecs support negative values. + + gzip + The compression_level maps to the memlevel parameter of + deflateInit2. Higher levels use more RAM but are faster + and should have higher compression ratios. + + bz2 + The compression level maps to the blockSize100k parameter of + the BZ2_bzCompressInit function. Higher levels use more RAM + but are faster and should have higher compression ratios. + + brotli + The compression level maps to the BROTLI_PARAM_QUALITY + parameter. Higher values are slower and should have higher + compression ratios. + + lz4/lz4_frame/lz4_raw + The compression level parameter is not supported and must + be None + + zstd + The compression level maps to the compressionLevel parameter + of ZSTD_initCStream. Negative values are supported. Higher + values are slower and should have higher compression ratios. + + snappy + The compression level parameter is not supported and must + be None + Raises ------ @@ -1632,9 +1666,14 @@ cdef class Codec(_Weakrefable): If invalid compression value is passed. """ - def __init__(self, str compression not None): + def __init__(self, str compression not None, compression_level = None): cdef CCompressionType typ = _ensure_compression(compression) - self.wrapped = move(GetResultValue(CCodec.Create(typ))) + if compression_level is not None: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.CreateWithLevel(typ, compression_level)))) + else: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.Create(typ)))) cdef inline CCodec* unwrap(self) nogil: return self.wrapped.get() @@ -1680,10 +1719,50 @@ cdef class Codec(_Weakrefable): cdef CCompressionType typ = _ensure_compression(compression) return CCodec.IsAvailable(typ) + @staticmethod + def supports_compression_level(str compression not None): + """ + Returns true if the compression level parameter is supported + for the given codec. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return CCodec.SupportsCompressionLevel(typ) + + @staticmethod + def default_compression_level(str compression not None): + """ + Returns the compression level that Arrow will use for the codec if + None is specified. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.DefaultCompressionLevel(typ)) + + @staticmethod + def minimum_compression_level(str compression not None): + """ + Returns the smallest valid value for the compression level + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MinimumCompressionLevel(typ)) + + @staticmethod + def maximum_compression_level(str compression not None): + """ + Returns the largest valid value for the compression level + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MaximumCompressionLevel(typ)) + @property def name(self): + """Returns the name of the codec""" return frombytes(self.unwrap().name()) + @property + def compression_level(self): + """Returns the compression level parameter of the codec""" + return frombytes(self.unwrap().compression_level()) + def compress(self, object buf, asbytes=False, memory_pool=None): """ Compress data from buffer-like object. diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index fd685b44fbb..5b1c633b8f4 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -96,14 +96,11 @@ cdef class IpcWriteOptions(_Weakrefable): If true, allow field lengths that don't fit in a signed 32-bit int. use_legacy_format : bool, default False Whether to use the pre-Arrow 0.15 IPC format. - compression: str, or None + compression: str, Codec, or None compression codec to use for record batch buffers. If None then batch buffers will be uncompressed. Must be "lz4", "zstd" or None. - compression_level: int, or None - The compression level to use. - Higher indicates more compression, details depend on the codec. - If None then a default level for the codec will be used. + To specify a compression_level use `pyarrow.Codec` use_threads: bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. @@ -117,13 +114,12 @@ cdef class IpcWriteOptions(_Weakrefable): def __init__(self, *, metadata_version=MetadataVersion.V5, bint allow_64bit=False, use_legacy_format=False, - compression=None, compression_level=None, - bint use_threads=True, bint emit_dictionary_deltas=False): + compression=None, bint use_threads=True, + bint emit_dictionary_deltas=False): self.c_options = CIpcWriteOptions.Defaults() self.allow_64bit = allow_64bit self.use_legacy_format = use_legacy_format self.metadata_version = metadata_version - self._compression_level = compression_level if compression is not None: self.compression = compression self.use_threads = use_threads @@ -164,24 +160,13 @@ cdef class IpcWriteOptions(_Weakrefable): def compression(self, value): if value is None: self.c_options.codec.reset() + elif isinstance(value, str): + self.c_options.codec = shared_ptr[CCodec](GetResultValue( + CCodec.Create(_ensure_compression(value))).release()) + elif isinstance(value, Codec): + self.c_options.codec = (value).wrapped else: - if self._compression_level is None: - self.c_options.codec = shared_ptr[CCodec](GetResultValue( - CCodec.Create(_ensure_compression(value))).release()) - else: - self.c_options.codec = shared_ptr[CCodec](GetResultValue( - CCodec.CreateWithLevel(_ensure_compression(value), - self._compression_level)).release()) - - @property - def compression_level(self): - return self._compression_level - - @compression_level.setter - def compression_level(self, value): - self._compression_level = value - # Recompute the codec - self.compression = self.compression + raise Exception("Property `compression` must be None, str, or pyarrow.io.Codec") @property def use_threads(self): diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 9608af2ac6a..414c7b5f26b 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -38,7 +38,6 @@ cdef class _Weakrefable: cdef class IpcWriteOptions(_Weakrefable): cdef: CIpcWriteOptions c_options - object _compression_level cdef class Message(_Weakrefable): @@ -500,7 +499,7 @@ cdef class RecordBatchReader(_Weakrefable): cdef class Codec(_Weakrefable): cdef: - unique_ptr[CCodec] wrapped + shared_ptr[CCodec] wrapped cdef inline CCodec* unwrap(self) nogil diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index a085312bbc7..4f4a22585d5 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -624,6 +624,89 @@ def test_compress_decompress(compression): pa.decompress(compressed_bytes, codec=compression) +@pytest.mark.parametrize("compression", [ + pytest.param( + "bz2", marks=pytest.mark.xfail(raises=pa.lib.ArrowNotImplementedError) + ), + "brotli", + "gzip", + "lz4", + "zstd", + "snappy" +]) +def test_compression_level(compression): + if not Codec.is_available(compression): + pytest.skip("{} support is not built".format(compression)) + + # These codecs do not support a compression level + no_level = ['snappy', 'lz4'] + if compression in no_level: + assert not Codec.supports_compression_level(compression) + with pytest.raises(ValueError): + Codec(compression, 0) + with pytest.raises(ValueError): + Codec.minimum_compression_level(compression) + with pytest.raises(ValueError): + Codec.maximum_compression_level(compression) + with pytest.raises(ValueError): + Codec.default_compression_level(compression) + return + + INPUT_SIZE = 10000 + test_data = (np.random.randint(0, 255, size=INPUT_SIZE) + .astype(np.uint8) + .tobytes()) + test_buf = pa.py_buffer(test_data) + + min_level = Codec.minimum_compression_level(compression) + max_level = Codec.maximum_compression_level(compression) + default_level = Codec.default_compression_level(compression) + + assert min_level < max_level + assert default_level >= min_level + assert default_level <= max_level + + for compression_level in range(min_level, max_level+1): + codec = Codec(compression, compression_level) + compressed_buf = codec.compress(test_buf) + compressed_bytes = codec.compress(test_data, asbytes=True) + assert isinstance(compressed_bytes, bytes) + decompressed_buf = codec.decompress(compressed_buf, INPUT_SIZE) + decompressed_bytes = codec.decompress(compressed_bytes, INPUT_SIZE, + asbytes=True) + + assert isinstance(decompressed_bytes, bytes) + + assert decompressed_buf.equals(test_buf) + assert decompressed_bytes == test_data + + with pytest.raises(ValueError): + codec.decompress(compressed_bytes) + + # The ability to set a seed this way is not present on older versions of + # numpy (currently in our python 3.6 CI build). Some inputs might just + # happen to compress the same between the two levels so using seeded + # random numbers is neccesary to help get more reliable results + # + # The goal of this part is to ensure the compression_level is being + # passed down to the C++ layer, not to verify the compression algs + # themselves + if not hasattr(np.random, 'default_rng'): + pytest.skip('Requires newer version of numpy') + rng = np.random.default_rng(seed=42) + values = rng.integers(0, 100, 1000) + arr = pa.array(values) + hard_to_compress_buffer = arr.buffers()[1] + + weak_codec = Codec(compression, min_level) + weakly_compressed_buf = weak_codec.compress(hard_to_compress_buffer) + + strong_codec = Codec(compression, max_level) + strongly_compressed_buf = strong_codec.compress(hard_to_compress_buffer) + + assert len(weakly_compressed_buf) > len(strongly_compressed_buf) + + def test_buffer_memoryview_is_immutable(): val = b'some data' diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 741a7991b18..b5ad03cd215 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -330,25 +330,19 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): def test_compression_roundtrip(): - # The ability to set a seed this way is not present on older versions of - # numpy (currently in our python 3.6 CI build). Some inputs might just - # happen to compress the same between the two levels so using seeded - # random numbers is neccesary - if not hasattr(np.random, 'default_rng'): - pytest.skip('Requires newer version of numpy') sink = io.BytesIO() - rng = np.random.default_rng(seed=42) - values = rng.integers(0, 10, 100000) + values = np.random.randint(0, 10, 10000) table = pa.Table.from_arrays([values], names=["values"]) - options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=1) + options = pa.ipc.IpcWriteOptions(compression='zstd') with pa.ipc.RecordBatchFileWriter( sink, table.schema, options=options) as writer: writer.write_table(table) len1 = len(sink.getvalue()) sink2 = io.BytesIO() - options = pa.ipc.IpcWriteOptions(compression='zstd', compression_level=5) + codec = pa.Codec('zstd', compression_level=5) + options = pa.ipc.IpcWriteOptions(compression=codec) with pa.ipc.RecordBatchFileWriter( sink2, table.schema, options=options) as writer: writer.write_table(table) @@ -389,20 +383,6 @@ def test_write_options(): assert options.compression == value options.compression = value.upper() assert options.compression == value - - options.compression = 'zstd' - for compression_level in [-5, 0, 5]: - options.compression_level = compression_level - assert options.compression_level == compression_level - - # Cannot set compression_level with lz4 - with pytest.raises(pa.ArrowInvalid): - options.compression = 'lz4' - - options.compression_level = None - options.compression = 'lz4' - assert options.compression == 'lz4' - options.compression = None assert options.compression is None From 332103ecfe5e4c832dd2af7f0fb59d38c1a4103b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 15 Jul 2021 18:00:29 -1000 Subject: [PATCH 6/9] ARROW-13091: Lint --- python/pyarrow/includes/libarrow.pxd | 3 +++ python/pyarrow/io.pxi | 2 +- python/pyarrow/ipc.pxi | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 423b5ec10d3..171b3ede217 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2359,10 +2359,13 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil: @staticmethod c_bool SupportsCompressionLevel(CCompressionType codec) + @staticmethod CResult[int] MinimumCompressionLevel(CCompressionType codec) + @staticmethod CResult[int] MaximumCompressionLevel(CCompressionType codec) + @staticmethod CResult[int] DefaultCompressionLevel(CCompressionType codec) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 167aed1623f..7d7cb1afb00 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1666,7 +1666,7 @@ cdef class Codec(_Weakrefable): If invalid compression value is passed. """ - def __init__(self, str compression not None, compression_level = None): + def __init__(self, str compression not None, compression_level=None): cdef CCompressionType typ = _ensure_compression(compression) if compression_level is not None: self.wrapped = shared_ptr[CCodec](move(GetResultValue( diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 5b1c633b8f4..733c987d9a3 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -166,7 +166,8 @@ cdef class IpcWriteOptions(_Weakrefable): elif isinstance(value, Codec): self.c_options.codec = (value).wrapped else: - raise Exception("Property `compression` must be None, str, or pyarrow.io.Codec") + raise Exception( + "Property `compression` must be None, str, or pyarrow.Codec") @property def use_threads(self): From 82472b42e717a675d034a7b3ad89acbb28f7aa07 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 16 Jul 2021 11:01:30 -1000 Subject: [PATCH 7/9] ARROW-13091: Added support for running python tests without any compression codecs. Changed generic Exception to TypeError in an error message --- python/pyarrow/ipc.pxi | 2 +- python/pyarrow/tests/conftest.py | 13 ++++++ .../parquet/test_compliant_nested_type.py | 2 + .../tests/parquet/test_parquet_file.py | 1 + .../tests/parquet/test_parquet_writer.py | 1 + python/pyarrow/tests/test_feather.py | 7 ++++ python/pyarrow/tests/test_fs.py | 3 ++ python/pyarrow/tests/test_io.py | 12 ++++++ python/pyarrow/tests/test_ipc.py | 40 ++++++++++++------- python/pyarrow/tests/test_tensor.py | 1 + 10 files changed, 66 insertions(+), 16 deletions(-) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 733c987d9a3..4b22acc076f 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -166,7 +166,7 @@ cdef class IpcWriteOptions(_Weakrefable): elif isinstance(value, Codec): self.c_options.codec = (value).wrapped else: - raise Exception( + raise TypeError( "Property `compression` must be None, str, or pyarrow.Codec") @property diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 3de07c4305f..8fb98d4a6e7 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -24,6 +24,7 @@ import hypothesis as h from pyarrow.util import find_free_port +from pyarrow import Codec # setup hypothesis profiles @@ -44,13 +45,17 @@ groups = [ + 'brotli', + 'bz2', 'cython', 'dataset', 'hypothesis', 'fastparquet', 'gandiva', + 'gzip', 'hdfs', 'large_memory', + 'lz4', 'memory_leak', 'nopandas', 'orc', @@ -58,20 +63,26 @@ 'parquet', 'plasma', 's3', + 'snappy', 'tensorflow', 'flight', 'slow', 'requires_testing_data', + 'zstd', ] defaults = { + 'brotli': Codec.is_available('brotli'), + 'bz2': Codec.is_available('bz2'), 'cython': False, 'dataset': False, 'fastparquet': False, 'hypothesis': False, 'gandiva': False, + 'gzip': Codec.is_available('gzip'), 'hdfs': False, 'large_memory': False, + 'lz4': Codec.is_available('lz4'), 'memory_leak': False, 'orc': False, 'nopandas': False, @@ -79,10 +90,12 @@ 'parquet': False, 'plasma': False, 's3': False, + 'snappy': Codec.is_available('snappy'), 'tensorflow': False, 'flight': False, 'slow': False, 'requires_testing_data': True, + 'zstd': Codec.is_available('zstd'), } try: diff --git a/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/python/pyarrow/tests/parquet/test_compliant_nested_type.py index 804f3738f12..5b10ed4c2f0 100644 --- a/python/pyarrow/tests/parquet/test_compliant_nested_type.py +++ b/python/pyarrow/tests/parquet/test_compliant_nested_type.py @@ -35,6 +35,8 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet + # Tests for ARROW-11497 _test_data_simple = [ {'items': [1, 2]}, diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index dc9a3bb5274..cb2d0029f45 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -36,6 +36,7 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet @pytest.mark.pandas def test_pass_separate_metadata(): diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index ec1d5256bfd..23a154b8710 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -36,6 +36,7 @@ except ImportError: pd = tm = None +pytestmark = pytest.mark.parquet @pytest.mark.pandas @parametrize_legacy_dataset diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index f01ac292ddf..cea8ecab71e 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -55,6 +55,9 @@ def version(request): @pytest.fixture(scope="module", params=[None, "uncompressed", "lz4", "zstd"]) def compression(request): + if request.param in ['lz4', 'zstd'] and not pa.Codec.is_available( + request.param): + pytest.skip(f'{request.param} is not available') yield request.param @@ -599,6 +602,9 @@ def test_v2_set_chunksize(): @pytest.mark.pandas +@pytest.mark.lz4 +@pytest.mark.snappy +@pytest.mark.zstd def test_v2_compression_options(): df = pd.DataFrame({'A': np.arange(1000)}) @@ -776,6 +782,7 @@ def test_roundtrip(table, compression): _check_arrow_roundtrip(table, compression=compression) +@pytest.mark.lz4 def test_feather_v017_experimental_compression_backward_compatibility(datadir): # ARROW-11163 - ensure newer pyarrow versions can read the old feather # files from version 0.17.0 with experimental compression support (before diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 8faddc7b9e4..0e049e21778 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -855,6 +855,7 @@ def identity(v): return v +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'compressor'), [ @@ -892,6 +893,7 @@ def test_open_input_file(fs, pathfn): assert result == data[read_from:] +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'decompressor'), [ @@ -913,6 +915,7 @@ def test_open_output_stream(fs, pathfn, compression, buffer_size, assert f.read(len(data)) == data +@pytest.mark.gzip @pytest.mark.parametrize( ('compression', 'buffer_size', 'compressor', 'decompressor'), [ diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 4f4a22585d5..5119e162595 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -1266,6 +1266,7 @@ def check_compressed_input(data, fn, compression): assert buf.to_pybytes() == data +@pytest.mark.gzip def test_compressed_input_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_input_test.gz") @@ -1292,6 +1293,7 @@ def check_compressed_concatenated(data, fn, compression): assert got == data +@pytest.mark.gzip def test_compressed_concatenated_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_input_test2.gz") @@ -1302,6 +1304,7 @@ def test_compressed_concatenated_gzip(tmpdir): check_compressed_concatenated(data, fn, "gzip") +@pytest.mark.gzip def test_compressed_input_invalid(): data = b"foo" * 10 raw = pa.BufferReader(data) @@ -1329,6 +1332,7 @@ def make_compressed_output(data, fn, compression): f.write(raw.getvalue()) +@pytest.mark.gzip def test_compressed_output_gzip(tmpdir): data = b"some test data\n" * 10 + b"eof\n" fn = str(tmpdir / "compressed_output_test.gz") @@ -1516,6 +1520,7 @@ def test_transcoding_decoding_error(src_encoding, dest_encoding): # ---------------------------------------------------------------------- # High-level API +@pytest.mark.gzip def test_input_stream_buffer(): data = b"some test data\n" * 10 + b"eof\n" for arg in [pa.py_buffer(data), memoryview(data)]: @@ -1561,6 +1566,7 @@ def test_input_stream_file_path(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_file_path_compressed(tmpdir): data = b"some test data\n" * 10 + b"eof\n" gz_data = gzip.compress(data) @@ -1607,6 +1613,7 @@ def test_input_stream_file_path_buffered(tmpdir): pa.input_stream(file_path, buffer_size='million') +@pytest.mark.gzip def test_input_stream_file_path_compressed_and_buffered(tmpdir): data = b"some test data\n" * 100 + b"eof\n" gz_data = gzip.compress(data) @@ -1622,6 +1629,7 @@ def test_input_stream_file_path_compressed_and_buffered(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n" bio = BytesIO(data) @@ -1645,6 +1653,7 @@ def test_input_stream_python_file(tmpdir): assert stream.read() == data +@pytest.mark.gzip def test_input_stream_native_file(): data = b"some test data\n" * 10 + b"eof\n" gz_data = gzip.compress(data) @@ -1723,6 +1732,7 @@ def check_data(file_path, data): check_data(pathlib.Path(str(file_path)), data) +@pytest.mark.gzip def test_output_stream_file_path_compressed(tmpdir): data = b"some test data\n" * 10 + b"eof\n" file_path = tmpdir / 'output_stream.gz' @@ -1773,6 +1783,7 @@ def check_data(file_path, data, **kwargs): assert result == data +@pytest.mark.gzip def test_output_stream_file_path_compressed_and_buffered(tmpdir): data = b"some test data\n" * 100 + b"eof\n" file_path = tmpdir / 'output_stream_compressed_and_buffered.gz' @@ -1812,6 +1823,7 @@ def check_data(file_path, data, **kwargs): assert check_data(file_path, data, buffer_size=1024) == data +@pytest.mark.gzip def test_output_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n" diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index b5ad03cd215..b6b42bc39fc 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -329,7 +329,11 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): reader.read_next_batch() +@pytest.mark.zstd def test_compression_roundtrip(): + if not pa.Codec.is_available('zstd'): + pytest.skip("{} support is not built".format('zstd')) + sink = io.BytesIO() values = np.random.randint(0, 10, 10000) table = pa.Table.from_arrays([values], names=["values"]) @@ -379,28 +383,33 @@ def test_write_options(): assert options.compression is None for value in ['lz4', 'zstd']: - options.compression = value - assert options.compression == value - options.compression = value.upper() - assert options.compression == value + if pa.Codec.is_available(value): + options.compression = value + assert options.compression == value + options.compression = value.upper() + assert options.compression == value options.compression = None assert options.compression is None + with pytest.raises(TypeError): + options.compression = 0 + assert options.use_threads is True options.use_threads = False assert options.use_threads is False - options = pa.ipc.IpcWriteOptions( - metadata_version=pa.ipc.MetadataVersion.V4, - allow_64bit=True, - use_legacy_format=True, - compression='lz4', - use_threads=False) - assert options.metadata_version == pa.ipc.MetadataVersion.V4 - assert options.allow_64bit is True - assert options.use_legacy_format is True - assert options.compression == 'lz4' - assert options.use_threads is False + if pa.Codec.is_available('lz4'): + options = pa.ipc.IpcWriteOptions( + metadata_version=pa.ipc.MetadataVersion.V4, + allow_64bit=True, + use_legacy_format=True, + compression='lz4', + use_threads=False) + assert options.metadata_version == pa.ipc.MetadataVersion.V4 + assert options.allow_64bit is True + assert options.use_legacy_format is True + assert options.compression == 'lz4' + assert options.use_threads is False def test_write_options_legacy_exclusive(stream_fixture): @@ -594,6 +603,7 @@ def test_message_serialize_read_message(example_messages): pa.ipc.read_message(reader) +@pytest.mark.gzip def test_message_read_from_compressed(example_messages): # Part of ARROW-5910 _, messages = example_messages diff --git a/python/pyarrow/tests/test_tensor.py b/python/pyarrow/tests/test_tensor.py index 07493144919..aee46bc9369 100644 --- a/python/pyarrow/tests/test_tensor.py +++ b/python/pyarrow/tests/test_tensor.py @@ -108,6 +108,7 @@ def test_tensor_ipc_roundtrip(tmpdir): assert result.equals(tensor) +@pytest.mark.gzip def test_tensor_ipc_read_from_compressed(tempdir): # ARROW-5910 data = np.random.randn(10, 4) From 25a8fa155907bb42d4095b1298bdfff45283a0b4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 16 Jul 2021 11:15:33 -1000 Subject: [PATCH 8/9] ARROW-13091: Removing redundant check --- python/pyarrow/tests/test_ipc.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index b6b42bc39fc..87944bcc066 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -331,9 +331,6 @@ def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): @pytest.mark.zstd def test_compression_roundtrip(): - if not pa.Codec.is_available('zstd'): - pytest.skip("{} support is not built".format('zstd')) - sink = io.BytesIO() values = np.random.randint(0, 10, 10000) table = pa.Table.from_arrays([values], names=["values"]) From 91a6ea48bf342955d86980ebd795398066845030 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 16 Jul 2021 11:16:57 -1000 Subject: [PATCH 9/9] ARROW-13091: Lint --- python/pyarrow/tests/parquet/test_parquet_file.py | 1 + python/pyarrow/tests/parquet/test_parquet_writer.py | 1 + python/pyarrow/tests/test_feather.py | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index cb2d0029f45..43175b72ab7 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -38,6 +38,7 @@ pytestmark = pytest.mark.parquet + @pytest.mark.pandas def test_pass_separate_metadata(): # ARROW-471 diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index 23a154b8710..4218e83cead 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -38,6 +38,7 @@ pytestmark = pytest.mark.parquet + @pytest.mark.pandas @parametrize_legacy_dataset def test_parquet_incremental_file_build(tempdir, use_legacy_dataset): diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index cea8ecab71e..3d0451ee33e 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -56,7 +56,7 @@ def version(request): @pytest.fixture(scope="module", params=[None, "uncompressed", "lz4", "zstd"]) def compression(request): if request.param in ['lz4', 'zstd'] and not pa.Codec.is_available( - request.param): + request.param): pytest.skip(f'{request.param} is not available') yield request.param