Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class SerializedPageReader : public PageReader {
InitDecryption();
}
max_page_header_size_ = kDefaultMaxPageHeaderSize;
decompressor_ = GetCodec(codec);
decompressor_ = internal::GetReadCodec(codec);
}

// Implement the PageReader interface
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class SerializedPageWriter : public PageWriter {
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
InitEncryption();
}
compressor_ = GetCodec(codec, compression_level);
compressor_ = internal::GetWriteCodec(codec, compression_level);
thrift_serializer_.reset(new ThriftSerializer);
}

Expand Down
10 changes: 6 additions & 4 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,15 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {

#ifdef ARROW_WITH_LZ4
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
LARGE_SIZE);
ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false,
false, LARGE_SIZE),
ParquetException);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
LARGE_SIZE);
ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false,
true, LARGE_SIZE),
ParquetException);
}
#endif

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/parquet/file_deserialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ TEST_F(TestPageSerde, Compression) {
codec_types.push_back(Compression::GZIP);
#endif

#ifdef ARROW_WITH_LZ4
codec_types.push_back(Compression::LZ4);
#endif
// TODO: Add LZ4 compression type after PARQUET-1878 is complete.
// Testing for deserializing LZ4 is hard without writing enabled, so it is not included.

#ifdef ARROW_WITH_ZSTD
codec_types.push_back(Compression::ZSTD);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/file_serialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ TYPED_TEST(TestSerialize, SmallFileGzip) {

#ifdef ARROW_WITH_LZ4
TYPED_TEST(TestSerialize, SmallFileLz4) {
ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::LZ4));
ASSERT_THROW(this->FileSerializeTest(Compression::LZ4), ParquetException);
}
#endif

Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/thrift_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static inline Compression::type FromThriftUnsafe(format::CompressionCodec::type
case format::CompressionCodec::BROTLI:
return Compression::BROTLI;
case format::CompressionCodec::LZ4:
// ARROW-9424: Existing files use LZ4_RAW but this may need to change
return Compression::LZ4;
case format::CompressionCodec::ZSTD:
return Compression::ZSTD;
Expand Down
33 changes: 29 additions & 4 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ bool IsCodecSupported(Compression::type codec) {
}
}

std::unique_ptr<Codec> GetCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel());
}
namespace internal {

std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) {
std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level,
bool for_writing) {
std::unique_ptr<Codec> result;
if (for_writing && (codec == Compression::LZ4 || codec == Compression::LZ4_FRAME)) {
throw ParquetException(
"Per ARROW-9424, writing files with LZ4 compression has been "
"disabled until implementation issues have been resolved. "
"It is recommended to read any existing files and rewrite them "
"using a different compression.");
}

if (!IsCodecSupported(codec)) {
std::stringstream ss;
ss << "Codec type " << Codec::GetCodecAsString(codec)
Expand All @@ -66,6 +73,24 @@ std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level)
return result;
}

std::unique_ptr<Codec> GetReadCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel(), /*for_writing=*/false);
}

std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level) {
return GetCodec(codec, compression_level, /*for_writing=*/true);
}

} // namespace internal

std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) {
return internal::GetCodec(codec, compression_level, /*for_writing=*/false);
}

std::unique_ptr<Codec> GetCodec(Compression::type codec) {
return GetCodec(codec, Codec::UseDefaultCompressionLevel());
}

std::string FormatStatValue(Type::type parquet_type, ::arrow::util::string_view val) {
std::stringstream result;

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ struct Encoding {
PARQUET_EXPORT
bool IsCodecSupported(Compression::type codec);

namespace internal {

// ARROW-9424: Separate functions for reading and writing so we can disable LZ4
// on writing
std::unique_ptr<Codec> GetReadCodec(Compression::type codec);
std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level);

} // namespace internal

PARQUET_EXPORT
std::unique_ptr<Codec> GetCodec(Compression::type codec);

Expand Down
16 changes: 14 additions & 2 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,10 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
tm.assert_frame_equal(df, df_read)


# ARROW-9424: LZ4 support is currently disabled
SUPPORTED_COMPRESSIONS = ['NONE', 'SNAPPY', 'GZIP', 'ZSTD']


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
Expand Down Expand Up @@ -735,7 +739,7 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)

for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
for compression in SUPPORTED_COMPRESSIONS:
if (compression != 'NONE' and
not pa.lib.Codec.is_available(compression)):
continue
Expand All @@ -747,6 +751,13 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
tm.assert_frame_equal(df, df_read)


# ARROW-9424: LZ4 support is currently disabled
def test_lz4_compression_disabled():
table = pa.table([pa.array([1, 2, 3, 4, 5])], names=['f0'])
with pytest.raises(IOError):
pq.write_table(table, pa.BufferOutputStream(), compression='lz4')


def make_sample_file(table_or_df):
if isinstance(table_or_df, pa.Table):
a_table = table_or_df
Expand Down Expand Up @@ -828,8 +839,9 @@ def test_compression_level(use_legacy_dataset):
# level.
# GZIP (zlib) allows for specifying a compression level but as of up
# to version 1.2.11 the valid range is [-1, 9].
invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
invalid_combinations = [("snappy", 4), ("gzip", -1337),
("None", 444), ("lzo", 14)]
# ARROW-9424: lz4 is disabled for now ("lz4", 5),
buf = io.BytesIO()
for (codec, level) in invalid_combinations:
with pytest.raises((ValueError, OSError)):
Expand Down