From 93d653b7d762652c8b4011dcfe2ac77225deeae2 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 27 Sep 2016 17:53:57 +0200 Subject: [PATCH] ARROW-305: Add compression and use_dictionary options to Parquet interface Change-Id: If9092030768265e7fb437dc9972461f96b438b72 --- python/pyarrow/includes/parquet.pxd | 12 +++++++ python/pyarrow/parquet.pyx | 49 +++++++++++++++++++++++++++- python/pyarrow/tests/test_parquet.py | 40 +++++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 9085b0bb298..754eeccecc8 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -37,6 +37,13 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0" PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0" + enum Compression" parquet::Compression::type": + UNCOMPRESSED" parquet::Compression::UNCOMPRESSED" + SNAPPY" parquet::Compression::SNAPPY" + GZIP" parquet::Compression::GZIP" + LZO" parquet::Compression::LZO" + BROTLI" parquet::Compression::BROTLI" + cdef cppclass SchemaDescriptor: shared_ptr[Node] schema() GroupNode* group() @@ -90,6 +97,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: cdef cppclass WriterProperties: cppclass Builder: Builder* version(ParquetVersion version) + Builder* compression(Compression codec) + Builder* compression(const c_string& path, Compression codec) + Builder* disable_dictionary() + Builder* enable_dictionary() + Builder* enable_dictionary(const c_string& path) shared_ptr[WriterProperties] build() diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index fb36b2967c0..099e148abc1 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -90,7 +90,8 @@ def read_table(source, columns=None): return reader.read_all() -def write_table(table, filename, chunk_size=None, version=None): +def write_table(table, filename, chunk_size=None, version=None, + use_dictionary=True, compression=None): """ Write a Table to Parquet format @@ -102,6 +103,11 @@ def write_table(table, filename, chunk_size=None, version=None): The maximum number of rows in each Parquet RowGroup version : {"1.0", "2.0"}, default "1.0" The Parquet format version, defaults to 1.0 + use_dictionary : bool or list + Specify if we should use dictionary encoding in general or only for + some columns. + compression : str or dict + Specify the compression codec, either on a general basis or per-column. """ cdef Table table_ = table cdef CTable* ctable_ = table_.table @@ -121,6 +127,47 @@ def write_table(table, filename, chunk_size=None, version=None): else: raise ArrowException("Unsupported Parquet format version") + if isinstance(use_dictionary, bool): + if use_dictionary: + properties_builder.enable_dictionary() + else: + properties_builder.disable_dictionary() + else: + # Deactivate dictionary encoding by default + properties_builder.disable_dictionary() + for column in use_dictionary: + properties_builder.enable_dictionary(column) + + if isinstance(compression, basestring): + if compression == "NONE": + properties_builder.compression(UNCOMPRESSED) + elif compression == "SNAPPY": + properties_builder.compression(SNAPPY) + elif compression == "GZIP": + properties_builder.compression(GZIP) + elif compression == "LZO": + properties_builder.compression(LZO) + elif compression == "BROTLI": + properties_builder.compression(BROTLI) + else: + raise ArrowException("Unsupport compression codec") + elif compression is not None: + # Deactivate dictionary encoding by default + properties_builder.disable_dictionary() + for column, codec in compression.iteritems(): + if codec == "NONE": + properties_builder.compression(column, UNCOMPRESSED) + elif codec == "SNAPPY": + properties_builder.compression(column, SNAPPY) + elif codec == "GZIP": + properties_builder.compression(column, GZIP) + elif codec == "LZO": + properties_builder.compression(column, LZO) + elif codec == "BROTLI": + properties_builder.compression(column, BROTLI) + else: + raise ArrowException("Unsupport compression codec") + sink.reset(new LocalFileOutputStream(tobytes(filename))) with nogil: check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 8a2d8cab572..0f9f2e40813 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -110,3 +110,43 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): df['uint32'] = df['uint32'].values.astype(np.int64) pdt.assert_frame_equal(df, df_read) + +@parquet +def test_pandas_parquet_configuration_options(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + + for use_dictionary in [True, False]: + A.parquet.write_table( + arrow_table, + filename.strpath, + version="2.0", + use_dictionary=use_dictionary) + table_read = pq.read_table(filename.strpath) + df_read = table_read.to_pandas() + pdt.assert_frame_equal(df, df_read) + + for compression in ['NONE', 'SNAPPY', 'GZIP']: + A.parquet.write_table( + arrow_table, + filename.strpath, + version="2.0", + compression=compression) + table_read = pq.read_table(filename.strpath) + df_read = table_read.to_pandas() + pdt.assert_frame_equal(df, df_read)