From 0c3221318be44a0ba8fd56ad4800ec29af36796d Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Thu, 3 Nov 2016 16:43:42 +0100 Subject: [PATCH 1/2] ARROW-361: Python: Support reading a column-selection from Parquet files Change-Id: Ie34b04fec6175e3514452e5cdab09c5dfcd5ad03 --- python/pyarrow/includes/parquet.pxd | 25 ++++++++++--- python/pyarrow/parquet.pyx | 52 +++++++++++++++++++++++++++- python/pyarrow/tests/test_parquet.py | 16 +++++++++ 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 754eeccecc8..57c35ba8944 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool +from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool from pyarrow.includes.libarrow_io cimport ReadableFileInterface @@ -32,6 +32,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: cdef cppclass PrimitiveNode(Node): pass + cdef cppclass ColumnPath: + c_string ToDotString() + cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: enum ParquetVersion" parquet::ParquetVersion::type": PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0" @@ -44,13 +47,14 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: LZO" parquet::Compression::LZO" BROTLI" parquet::Compression::BROTLI" + cdef cppclass ColumnDescriptor: + shared_ptr[ColumnPath] path() + cdef cppclass SchemaDescriptor: + const ColumnDescriptor* Column(int i) shared_ptr[Node] schema() GroupNode* group() - cdef cppclass ColumnDescriptor: - pass - cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass ColumnReader: @@ -80,10 +84,21 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass RowGroupReader: pass + cdef cppclass FileMetaData: + uint32_t size() + int num_columns() + int64_t num_rows() + int num_row_groups() + int32_t version() + const c_string created_by() + int num_schema_elements() + const SchemaDescriptor* schema() + cdef cppclass ParquetFileReader: # TODO: Some default arguments are missing @staticmethod unique_ptr[ParquetFileReader] OpenFile(const c_string& path) + const FileMetaData* metadata(); cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: @@ -124,7 +139,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef cppclass FileReader: FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) + CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out); CStatus ReadFlatTable(shared_ptr[CTable]* out); + const ParquetFileReader* parquet_reader(); cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 019dd2c1de4..dedd9b00159 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -24,6 +24,7 @@ from pyarrow.includes.parquet cimport * from pyarrow.includes.libarrow_io cimport ReadableFileInterface cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow.array cimport Array from pyarrow.compat import tobytes from pyarrow.error import ArrowException from pyarrow.error cimport check_status @@ -76,11 +77,55 @@ cdef class ParquetReader: table.init(ctable) return table + def column_name_idx(self, column_name): + """ + Find the matching index of a column in the schema. + + Parameter + --------- + column_name: str + Name of the column, separation of nesting levels is done via ".". + + Returns + ------- + column_idx: int + Integer index of the position of the column + """ + cdef: + c_string column_path = tobytes(column_name) + const FileMetaData* metadata = self.reader.get().parquet_reader().metadata() + int i = 0 + + for i in range(0, metadata.num_columns()): + if metadata.schema().Column(i).path().get().ToDotString() == column_path: + return i + + return 0 + + def read_column(self, int column_index): + cdef: + Array array = Array() + shared_ptr[CArray] carray + + with nogil: + check_status(self.reader.get().ReadFlatColumn(column_index, &carray)) + + array.init(carray) + return array + def read_table(source, columns=None): """ Read a Table from Parquet format + Parameters + ---------- + source: str or pyarrow.io.NativeFile + Readable source. For passing Python file objects or byte buffers, see + pyarrow.io.PythonFileInterface or pyarrow.io.BytesReader. + columns: list + If not None, only these columns will be read from the file. + Returns ------- pyarrow.table.Table @@ -93,7 +138,12 @@ def read_table(source, columns=None): elif isinstance(source, NativeFile): reader.open_native_file(source) - return reader.read_all() + if columns is None: + return reader.read_all() + else: + column_idxs = [reader.column_name_idx(column) for column in columns] + arrays = [reader.read_column(column_idx) for column_idx in column_idxs] + return Table.from_arrays(columns, arrays) def write_table(table, filename, chunk_size=None, version=None, diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 922ad3aa9ff..c1d44ce0d42 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -115,6 +115,22 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): pdt.assert_frame_equal(df, df_read) +@parquet +def test_pandas_column_selection(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16) + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath) + table_read = pq.read_table(filename.strpath, columns=['uint8']) + df_read = table_read.to_pandas() + + pdt.assert_frame_equal(df[['uint8']], df_read) + @parquet def test_pandas_parquet_configuration_options(tmpdir): size = 10000 From c1fb93900f522c4f99d002bb9c1c24c75da63276 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Thu, 3 Nov 2016 19:38:43 +0100 Subject: [PATCH 2/2] Cache column indices Change-Id: Iacd2dc62f7379c7907bdc392658ea39ff80e1a33 --- python/pyarrow/parquet.pyx | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index dedd9b00159..ea115d072da 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -44,6 +44,7 @@ cdef class ParquetReader: cdef: ParquetAllocator allocator unique_ptr[FileReader] reader + column_idx_map def __cinit__(self): self.allocator.set_pool(default_memory_pool()) @@ -92,15 +93,15 @@ cdef class ParquetReader: Integer index of the position of the column """ cdef: - c_string column_path = tobytes(column_name) const FileMetaData* metadata = self.reader.get().parquet_reader().metadata() int i = 0 - for i in range(0, metadata.num_columns()): - if metadata.schema().Column(i).path().get().ToDotString() == column_path: - return i + if self.column_idx_map is None: + self.column_idx_map = {} + for i in range(0, metadata.num_columns()): + self.column_idx_map[str(metadata.schema().Column(i).path().get().ToDotString())] = i - return 0 + return self.column_idx_map[column_name] def read_column(self, int column_index): cdef: