From 85f9b720620ab9adf636ce35926e14d09187fa6e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 1 Aug 2019 20:31:49 -0500 Subject: [PATCH 1/7] Initial threading of read_dictionary parameter, not terribly satisfying --- cpp/src/parquet/arrow/reader.cc | 50 ++++++++++++++------ cpp/src/parquet/arrow/reader.h | 31 ++++++++++--- python/pyarrow/_parquet.pxd | 32 +++++++------ python/pyarrow/_parquet.pyx | 47 ++++++++++++------- python/pyarrow/parquet.py | 69 +++++++++++----------------- python/pyarrow/tests/test_parquet.py | 14 +----- 6 files changed, 137 insertions(+), 106 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 11be5713c92..370616339d6 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -814,28 +814,50 @@ Status FileReader::Make(::arrow::MemoryPool* pool, return Make(pool, std::move(reader), default_arrow_reader_properties(), out); } -Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, - MemoryPool* pool, const ReaderProperties& props, - const std::shared_ptr& metadata, - std::unique_ptr* reader) { - std::unique_ptr pq_reader; - PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(file, props, metadata)); - return FileReader::Make(pool, std::move(pq_reader), default_arrow_reader_properties(), - reader); +FileReaderBuilder::FileReaderBuilder(std::unique_ptr raw_reader) + : raw_reader_(std::move(raw_reader)) {} + +Status FileReaderBuilder::Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + const ReaderProperties& properties, + const std::shared_ptr& metadata, + std::unique_ptr* builder) { + std::unique_ptr raw_reader; + PARQUET_CATCH_NOT_OK(raw_reader = ParquetReader::Open(file, properties, metadata)); + builder->reset(new FileReaderBuilder(std::move(raw_reader))); + return Status::OK(); +} + +Status FileReaderBuilder::Build(MemoryPool* pool, const ArrowReaderProperties& properties, + std::unique_ptr* out) { + return FileReader::Make(pool, std::move(raw_reader_), properties, out); } Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, MemoryPool* pool, std::unique_ptr* reader) { - return OpenFile(file, pool, ::parquet::default_reader_properties(), nullptr, reader); + std::unique_ptr builder; + RETURN_NOT_OK(FileReaderBuilder::Open(file, ::parquet::default_reader_properties(), + nullptr, &builder)); + return builder->Build(pool, default_arrow_reader_properties(), reader); +} + +Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + MemoryPool* pool, const ReaderProperties& props, + const std::shared_ptr& metadata, + std::unique_ptr* reader) { + // Deprecated since 0.15.0 + std::unique_ptr builder; + RETURN_NOT_OK(FileReaderBuilder::Open(file, props, metadata, &builder)); + return builder->Build(pool, default_arrow_reader_properties(), reader); } Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, - ::arrow::MemoryPool* pool, const ArrowReaderProperties& properties, + MemoryPool* pool, const ArrowReaderProperties& properties, std::unique_ptr* reader) { - std::unique_ptr pq_reader; - PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open( - file, ::parquet::default_reader_properties(), nullptr)); - return FileReader::Make(pool, std::move(pq_reader), properties, reader); + // Deprecated since 0.15.0 + std::unique_ptr builder; + RETURN_NOT_OK(FileReaderBuilder::Open(file, ::parquet::default_reader_properties(), + nullptr, &builder)); + return builder->Build(pool, default_arrow_reader_properties(), reader); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index e73fcc0cb16..39f00cf8728 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -286,22 +286,41 @@ class PARQUET_EXPORT ColumnReader { std::shared_ptr<::arrow::ChunkedArray>* out) = 0; }; -// Helper function to create a file reader from an implementation of an Arrow -// random access file -// -// metadata : separately-computed file metadata, can be nullptr +/// \brief Experimental helper class for bindings (like Python) that struggle +/// either with std::move or C++ exceptions +class FileReaderBuilder { + public: + explicit FileReaderBuilder(std::unique_ptr raw_reader); + + static ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + const ReaderProperties& properties, + const std::shared_ptr& metadata, + std::unique_ptr* builder); + + ParquetFileReader* raw_reader() { return raw_reader_.get(); } + + ::arrow::Status Build(::arrow::MemoryPool* allocator, + const ArrowReaderProperties& arrow_properties, + std::unique_ptr* out); + + private: + std::unique_ptr raw_reader_; +}; + PARQUET_EXPORT ::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, ::arrow::MemoryPool* allocator, - const ReaderProperties& properties, - const std::shared_ptr& metadata, std::unique_ptr* reader); +ARROW_DEPRECATED("Deprecated since 0.15.0. Use FileReaderBuilder") PARQUET_EXPORT ::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, ::arrow::MemoryPool* allocator, + const ReaderProperties& properties, + const std::shared_ptr& metadata, std::unique_ptr* reader); +ARROW_DEPRECATED("Deprecated since 0.15.0. Use FileReaderBuilder") PARQUET_EXPORT ::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, ::arrow::MemoryPool* allocator, diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 3a9dc9ba1c1..659f65ce575 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -328,14 +328,6 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: ReaderProperties default_reader_properties() cdef cppclass ParquetFileReader: - @staticmethod - unique_ptr[ParquetFileReader] Open( - const shared_ptr[RandomAccessFile]& file, - const ReaderProperties& props, - const shared_ptr[CFileMetaData]& metadata) - - @staticmethod - unique_ptr[ParquetFileReader] OpenFile(const c_string& path) shared_ptr[CFileMetaData] metadata() @@ -359,16 +351,14 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef cppclass ArrowReaderProperties: - pass + ArrowReaderProperties() + void set_read_dictionary(int column_index, c_bool read_dict) + c_bool read_dictionary() + void set_batch_size() + int64_t batch_size() ArrowReaderProperties default_arrow_reader_properties() - CStatus OpenFile(const shared_ptr[RandomAccessFile]& file, - CMemoryPool* allocator, - const ReaderProperties& properties, - const shared_ptr[CFileMetaData]& metadata, - unique_ptr[FileReader]* reader) - cdef cppclass FileReader: FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out) @@ -390,6 +380,18 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: void set_use_threads(c_bool use_threads) + cdef cppclass FileReaderBuilder: + @staticmethod + CStatus Open(const shared_ptr[RandomAccessFile]& file, + const ReaderProperties& properties, + const shared_ptr[CFileMetaData]& metadata, + unique_ptr[FileReaderBuilder]* builder) + + ParquetFileReader* raw_reader() + CStatus Build(CMemoryPool* pool, + const ArrowReaderProperties& arrow_properties, + unique_ptr[FileReader]* out) + CStatus FromParquetSchema( const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties, diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 1944ab6653b..9412ce52c2e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -978,7 +978,7 @@ cdef ParquetCompression compression_from_name(name): cdef class ParquetReader: cdef: object source - CMemoryPool* allocator + CMemoryPool* pool unique_ptr[FileReader] reader FileMetaData _metadata @@ -986,16 +986,20 @@ cdef class ParquetReader: _column_idx_map def __cinit__(self, MemoryPool memory_pool=None): - self.allocator = maybe_unbox_memory_pool(memory_pool) + self.pool = maybe_unbox_memory_pool(memory_pool) self._metadata = None def open(self, object source, c_bool use_memory_map=True, - FileMetaData metadata=None): + read_dictionary=None, FileMetaData metadata=None): cdef: shared_ptr[RandomAccessFile] rd_handle shared_ptr[CFileMetaData] c_metadata ReaderProperties properties = default_reader_properties() + ArrowReaderProperties arrow_props = ( + default_arrow_reader_properties()) c_string path + unique_ptr[FileReaderBuilder] builder + const CFileMetaData* metadata if metadata is not None: c_metadata = metadata.sp_metadata @@ -1004,8 +1008,28 @@ cdef class ParquetReader: get_reader(source, use_memory_map, &rd_handle) with nogil: - check_status(OpenFile(rd_handle, self.allocator, properties, - c_metadata, &self.reader)) + check_status(FileReaderBuilder.Open(rd_handle, properties, + c_metadata, &builder)) + + # Set up metadata + with nogil: + metadata = builder.get().raw_reader().metadata() + self._metadata = result = FileMetaData() + result.init(metadata) + + if read_dictionary is not None: + self._set_read_dictionary(read_dictionary, &arrow_props) + + with nogil: + check_status(builder.get().Build(self.pool, arrow_props, + &self.reader)) + + cdef _set_read_dictionary(self, read_dictionary, + ArrowReaderProperties* props): + for column in read_dictionary: + if not isinstance(column, int): + column = self.column_name_idx(column) + props.set_read_dictionary(column_index, True) @property def column_paths(self): @@ -1025,18 +1049,7 @@ cdef class ParquetReader: @property def metadata(self): - cdef: - shared_ptr[CFileMetaData] metadata - FileMetaData result - if self._metadata is not None: - return self._metadata - - with nogil: - metadata = self.reader.get().parquet_reader().metadata() - - self._metadata = result = FileMetaData() - result.init(metadata) - return result + return self._metadata @property def num_row_groups(self): diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index c3199db95d3..ad837c0ee7d 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -127,9 +127,10 @@ class ParquetFile(object): improve performance in some environments """ def __init__(self, source, metadata=None, common_metadata=None, - memory_map=True): + read_dictionary=None, memory_map=True): self.reader = ParquetReader() - self.reader.open(source, use_memory_map=memory_map, metadata=metadata) + self.reader.open(source, use_memory_map=memory_map, + read_dictionary=read_dictionary, metadata=metadata) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -463,11 +464,12 @@ class ParquetDatasetPiece(object): Row group to load. By default, reads all row groups """ def __init__(self, path, open_file_func=partial(open, mode='rb'), - row_group=None, partition_keys=None): + file_options=None, row_group=None, partition_keys=None): self.path = _stringify_path(path) self.open_file_func = open_file_func self.row_group = row_group self.partition_keys = partition_keys or [] + self.file_options = file_options or {} def __eq__(self, other): if not isinstance(other, ParquetDatasetPiece): @@ -500,49 +502,28 @@ def __str__(self): return result - def get_metadata(self, open_file_func=None): + def get_metadata(self): """ Returns the file's metadata - Parameters - ---------- - open_file_func : function, deprecated - Function to use for obtaining file handle to dataset piece. - Deprecated in version 0.13.0. Use ``open_file_func`` parameter of - the constructor instead. - Returns ------- metadata : FileMetaData """ - if open_file_func is not None: - f = self._open(open_file_func) - else: - f = self.open() + f = self.open() return f.metadata - def _open(self, open_file_func): - """ - Returns instance of ParquetFile - """ - warnings.warn('open_file_func argument is deprecated, please pass ' - 'it to ParquetDatasetPiece instead', DeprecationWarning) - reader = open_file_func(self.path) - if not isinstance(reader, ParquetFile): - reader = ParquetFile(reader) - return reader - def open(self): """ Returns instance of ParquetFile """ reader = self.open_file_func(self.path) if not isinstance(reader, ParquetFile): - reader = ParquetFile(reader) + reader = ParquetFile(reader, **self.file_options) return reader def read(self, columns=None, use_threads=True, partitions=None, - open_file_func=None, file=None, use_pandas_metadata=False): + file=None, use_pandas_metadata=False): """ Read this piece as a pyarrow.Table @@ -552,10 +533,6 @@ def read(self, columns=None, use_threads=True, partitions=None, use_threads : boolean, default True Perform multi-threaded column reads partitions : ParquetPartitions, default None - open_file_func : function, deprecated - Function to use for obtaining file handle to dataset piece. - Deprecated in version 0.13.0. Use ``open_file_func`` parameter of - the constructor instead. file : file-like object passed to ParquetFile @@ -563,15 +540,13 @@ def read(self, columns=None, use_threads=True, partitions=None, ------- table : pyarrow.Table """ - if open_file_func is not None: - reader = self._open(open_file_func) - elif self.open_file_func is not None: + if self.open_file_func is not None: reader = self.open() elif file is not None: - reader = ParquetFile(file) + reader = ParquetFile(file, **self.file_options) else: # try to read the local path - reader = ParquetFile(self.path) + reader = ParquetFile(self.path, **self.file_options) options = dict(columns=columns, use_threads=use_threads, @@ -906,10 +881,12 @@ def _path_split(path, sep): def _open_dataset_file(dataset, path, meta=None): if dataset.fs is None or isinstance(dataset.fs, LocalFileSystem): return ParquetFile(path, metadata=meta, memory_map=dataset.memory_map, + read_dictionary=dataset.read_dictionary, common_metadata=dataset.common_metadata) else: return ParquetFile(dataset.fs.open(path, mode='rb'), metadata=meta, memory_map=dataset.memory_map, + read_dictionary=dataset.read_dictionary, common_metadata=dataset.common_metadata) @@ -959,7 +936,8 @@ class ParquetDataset(object): """ def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, - filters=None, metadata_nthreads=1, memory_map=True): + filters=None, metadata_nthreads=1, + read_dictionary=None, memory_map=True): a_path = path_or_paths if isinstance(a_path, list): a_path = a_path[0] @@ -970,6 +948,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, else: self.paths = _parse_uri(path_or_paths) + self.read_dictionary = read_dictionary self.memory_map = memory_map (self.pieces, @@ -988,7 +967,9 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, if metadata is None and self.metadata_path is not None: with self.fs.open(self.metadata_path) as f: - self.metadata = ParquetFile(f, memory_map=memory_map).metadata + self.metadata = ParquetFile( + f, memory_map=memory_map, + read_dictionary=read_dictionary).metadata else: self.metadata = metadata @@ -1190,6 +1171,9 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, memory_map : boolean, default True If the source is a file path, use a memory map to read file, which can improve performance in some environments +read_dictionary : list, default None + List of column paths to read directly as DictionaryArray. Only supported + for BYTE_ARRAY storage {1} filters : List[Tuple] or List[List[Tuple]] or None (default) List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This @@ -1205,12 +1189,15 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, def read_table(source, columns=None, use_threads=True, metadata=None, use_pandas_metadata=False, memory_map=True, - filesystem=None, filters=None): + read_dictionary=None, filesystem=None, filters=None): if _is_path_like(source): pf = ParquetDataset(source, metadata=metadata, memory_map=memory_map, + read_dictionary=read_dictionary, filesystem=filesystem, filters=filters) else: - pf = ParquetFile(source, metadata=metadata, memory_map=memory_map) + pf = ParquetFile(source, metadata=metadata, + read_dictionary=read_dictionary, + memory_map=memory_map) return pf.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index ca3fbc4c816..e4580ecb751 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -1356,19 +1356,7 @@ def test_parquet_piece_open_and_get_metadata(tempdir): meta1 = piece.get_metadata() assert isinstance(meta1, pq.FileMetaData) - def open_parquet(fn): - return pq.ParquetFile(open(fn, mode='rb')) - - # test deprecated open_file_func - with pytest.warns(DeprecationWarning): - table2 = piece.read(open_file_func=open_parquet) - assert isinstance(table2, pa.Table) - with pytest.warns(DeprecationWarning): - meta2 = piece.get_metadata(open_file_func=open_parquet) - assert isinstance(meta2, pq.FileMetaData) - - assert table == table1 == table2 - assert meta1 == meta2 + assert table == table1 def test_parquet_piece_basics(): From 9d503516f1e12e22546601b4de8ffe43ce453ae6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 2 Aug 2019 10:17:51 -0500 Subject: [PATCH 2/7] Read Parquet fields directly as DictionaryArray in parquet.read_table and ParquetDataset --- python/pyarrow/_parquet.pyx | 7 ++- python/pyarrow/array.pxi | 6 +++ python/pyarrow/table.pxi | 52 +++++++++++++-------- python/pyarrow/tests/test_csv.py | 6 +-- python/pyarrow/tests/test_json.py | 4 +- python/pyarrow/tests/test_orc.py | 2 +- python/pyarrow/tests/test_parquet.py | 69 +++++++++++++++++++++++++++- python/pyarrow/tests/test_table.py | 31 ++++++------- 8 files changed, 131 insertions(+), 46 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 9412ce52c2e..52379ff63a4 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -999,7 +999,6 @@ cdef class ParquetReader: default_arrow_reader_properties()) c_string path unique_ptr[FileReaderBuilder] builder - const CFileMetaData* metadata if metadata is not None: c_metadata = metadata.sp_metadata @@ -1013,9 +1012,9 @@ cdef class ParquetReader: # Set up metadata with nogil: - metadata = builder.get().raw_reader().metadata() + c_metadata = builder.get().raw_reader().metadata() self._metadata = result = FileMetaData() - result.init(metadata) + result.init(c_metadata) if read_dictionary is not None: self._set_read_dictionary(read_dictionary, &arrow_props) @@ -1029,7 +1028,7 @@ cdef class ParquetReader: for column in read_dictionary: if not isinstance(column, int): column = self.column_name_idx(column) - props.set_read_dictionary(column_index, True) + props.set_read_dictionary(column, True) @property def column_paths(self): diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index d974418a3ad..ecb8ff5ca33 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1052,6 +1052,12 @@ cdef class ListArray(Array): cpool, &out)) return pyarrow_wrap_array(out) + @property + def values(self): + return self.flatten() + + # TODO(wesm): Add offsets property + def flatten(self): """ Unnest this ListArray by one level diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c0782fe26c2..01fa03c6563 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -393,20 +393,24 @@ cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema): if len(names) != K: raise ValueError('Length of names ({}) does not match ' 'length of arrays ({})'.format(len(names), K)) + + converted_arrays = [] for i in range(K): val = arrays[i] - if isinstance(val, (Array, ChunkedArray)): - c_type = ( val.type).sp_type - else: - raise TypeError(type(val)) + if not isinstance(val, (Array, ChunkedArray)): + val = array(val) + + c_type = ( val.type).sp_type if names[i] is None: c_name = tobytes(u'None') else: c_name = tobytes(names[i]) c_fields[i].reset(new CField(c_name, c_type, True)) + converted_arrays.append(val) schema.reset(new CSchema(c_fields, c_meta)) + return converted_arrays cdef class RecordBatch(_PandasConvertible): @@ -676,10 +680,11 @@ cdef class RecordBatch(_PandasConvertible): num_rows = len(arrays[0]) else: num_rows = 0 + if isinstance(names, Schema): c_schema = ( names).sp_schema else: - _schema_from_arrays(arrays, names, metadata, &c_schema) + arrays = _schema_from_arrays(arrays, names, metadata, &c_schema) c_arrays.reserve(len(arrays)) for arr in arrays: @@ -744,7 +749,7 @@ cdef class Table(_PandasConvertible): self.sp_table = table self.table = table.get() - def _validate(self): + def validate(self): """ Validate table consistency. """ @@ -988,7 +993,7 @@ cdef class Table(_PandasConvertible): int i, K = len(arrays) if schema is None: - _schema_from_arrays(arrays, names, metadata, &c_schema) + arrays = _schema_from_arrays(arrays, names, metadata, &c_schema) elif schema is not None: if names is not None: raise ValueError('Cannot pass both schema and names') @@ -1015,7 +1020,9 @@ cdef class Table(_PandasConvertible): else: raise TypeError(type(arrays[i])) - return pyarrow_wrap_table(CTable.Make(c_schema, columns)) + result = pyarrow_wrap_table(CTable.Make(c_schema, columns)) + result.validate() + return result @staticmethod def from_pydict(mapping, schema=None, metadata=None): @@ -1434,18 +1441,24 @@ def _reconstruct_table(arrays, schema): return Table.from_arrays(arrays, schema=schema) -def table(data, schema=None): +def table(data, names=None, schema=None, metadata=None): """ - Create a pyarrow.Table from a Python object (table like objects such as - DataFrame, dictionary). + Create a pyarrow.Table from another Python data structure or sequence of + arrays Parameters ---------- - data : pandas.DataFrame, dict - A DataFrame or a mapping of strings to Arrays or Python lists. + data : pandas.DataFrame, dict, list + A DataFrame, mapping of strings to Arrays or Python lists, or list of + arrays or chunked arrays + names : list, default None + Column names if list of arrays passed as data. Mutually exclusive with + 'schema' argument schema : Schema, default None - The expected schema of the Arrow Table. If not passed, will be - inferred from the data. + The expected schema of the Arrow Table. If not passed, will be inferred + from the data. Mutually exclusive with 'names' argument + metadata : dict or Mapping, default None + Optional metadata for the schema (if schema not passed). Returns ------- @@ -1453,10 +1466,13 @@ def table(data, schema=None): See Also -------- - Table.from_pandas, Table.from_pydict + Table.from_arrays, Table.from_pandas, Table.from_pydict """ - if isinstance(data, dict): - return Table.from_pydict(data, schema=schema) + if isinstance(data, (list, tuple)): + return Table.from_arrays(data, names=names, schema=schema, + metadata=metadata) + elif isinstance(data, dict): + return Table.from_pydict(data, schema=schema, metadata=metadata) elif isinstance(data, _pandas_api.pd.DataFrame): return Table.from_pandas(data, schema=schema) else: diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 213fbc5f5a1..a42237eb803 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -562,7 +562,7 @@ def read_csv(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = False table = read_csv(*args, **kwargs) - table._validate() + table.validate() return table @@ -572,7 +572,7 @@ def read_csv(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = True table = read_csv(*args, **kwargs) - table._validate() + table.validate() return table @@ -595,7 +595,7 @@ def test_random_csv(self): csv_path = os.path.join(self.tmpdir, self.csv_filename) self.write_file(csv_path, csv) table = self.read_csv(csv_path) - table._validate() + table.validate() assert table.schema == expected.schema assert table.equals(expected) assert table.to_pydict() == expected.to_pydict() diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 7885455ca11..22259786902 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -133,7 +133,7 @@ def read_json(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = False table = read_json(*args, **kwargs) - table._validate() + table.validate() return table @@ -143,5 +143,5 @@ def read_json(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = True table = read_json(*args, **kwargs) - table._validate() + table.validate() return table diff --git a/python/pyarrow/tests/test_orc.py b/python/pyarrow/tests/test_orc.py index 2a274c9b249..1ce09c1cdad 100644 --- a/python/pyarrow/tests/test_orc.py +++ b/python/pyarrow/tests/test_orc.py @@ -87,7 +87,7 @@ def check_example_file(orc_path, expected_df, need_fix=False): # Exercise ORCFile.read() table = orc_file.read() assert isinstance(table, pa.Table) - table._validate() + table.validate() # This workaround needed because of ARROW-3080 orc_df = pd.DataFrame(table.to_pydict()) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index e4580ecb751..236732edceb 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2847,8 +2847,8 @@ def test_partitioned_dataset(tempdir): def test_read_column_invalid_index(): - table = pa.Table.from_arrays([pa.array([4, 5]), pa.array(["foo", "bar"])], - ['ints', 'strs']) + table = pa.table([pa.array([4, 5]), pa.array(["foo", "bar"])], + names=['ints', 'strs']) bio = pa.BufferOutputStream() pq.write_table(table, bio) f = pq.ParquetFile(bio.getvalue()) @@ -2859,6 +2859,71 @@ def test_read_column_invalid_index(): f.reader.read_column(index) +def test_direct_read_dictionary(): + # ARROW-3325 + repeats = 10 + nunique = 5 + + data = [ + [tm.rands(10) for i in range(nunique)] * repeats, + + ] + table = pa.table(data, names=['f0']) + + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + + result = pq.read_table(pa.BufferReader(contents), + read_dictionary=['f0']) + + # Compute dictionary-encoded subfield + expected = pa.table([table[0].dictionary_encode()], names=['f0']) + assert result.equals(expected) + + +def test_dataset_read_dictionary(tempdir): + path = tempdir / "ARROW-3325-dataset" + data = [ + [tm.rands(10) for i in range(5)] * 10, + ] + table = pa.table(data, names=['f0']) + pq.write_to_dataset(table, root_path=str(path)) + result = pq.ParquetDataset(path, read_dictionary=['f0']).read() + expected = pa.table([table[0].dictionary_encode()], names=['f0']) + assert result.equals(expected) + + +def test_direct_read_dictionary_subfield(): + repeats = 10 + nunique = 5 + + data = [ + [[tm.rands(10)] for i in range(nunique)] * repeats, + ] + table = pa.table(data, names=['f0']) + + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + result = pq.read_table(pa.BufferReader(contents), + read_dictionary=['f0.list.item']) + + arr = pa.array(data[0]) + values_as_dict = arr.values.dictionary_encode() + + inner_indices = values_as_dict.indices.cast('int32') + new_values = pa.DictionaryArray.from_arrays(inner_indices, + values_as_dict.dictionary) + + offsets = pa.array(range(51), type='int32') + expected_arr = pa.ListArray.from_arrays(offsets, new_values) + expected = pa.table([expected_arr], names=['f0']) + + assert result.equals(expected) + assert result[0].num_chunks == 1 + + @pytest.mark.pandas def test_dataset_metadata(tempdir): path = tempdir / "ARROW-1983-dataset" diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 0645fcbe180..24ffd859bc4 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -186,7 +186,7 @@ def test_chunked_array_to_pandas(): data = [ pa.array([-10, -5, 0, 5, 10]) ] - table = pa.Table.from_arrays(data, names=['a']) + table = pa.table(data, names=['a']) col = table.column(0) assert isinstance(col, pa.ChunkedArray) array = col.to_pandas() @@ -445,8 +445,8 @@ def test_table_basics(): pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]) ] - table = pa.Table.from_arrays(data, names=('a', 'b')) - table._validate() + table = pa.table(data, names=('a', 'b')) + table.validate() assert len(table) == 5 assert table.num_rows == 5 assert table.num_columns == 2 @@ -474,9 +474,8 @@ def test_table_basics(): col.chunk(col.num_chunks) assert table.columns == columns - assert table == pa.Table.from_arrays(columns, names=table.column_names) - assert table != pa.Table.from_arrays(columns[1:], - names=table.column_names[1:]) + assert table == pa.table(columns, names=table.column_names) + assert table != pa.table(columns[1:], names=table.column_names[1:]) assert table != columns @@ -511,14 +510,14 @@ def test_table_from_lists_raises(): ] with pytest.raises(TypeError): - pa.Table.from_arrays(data, names=['a', 'b']) + pa.table(data, names=['a', 'b']) schema = pa.schema([ pa.field('a', pa.uint16()), pa.field('b', pa.int64()) ]) with pytest.raises(TypeError): - pa.Table.from_arrays(data, schema=schema) + pa.table(data, schema=schema) def test_table_pickle(): @@ -532,7 +531,7 @@ def test_table_pickle(): table = pa.Table.from_arrays(data, schema=schema) result = pickle.loads(pickle.dumps(table)) - result._validate() + result.validate() assert result.equals(table) @@ -620,7 +619,7 @@ def test_table_remove_column(): table = pa.Table.from_arrays(data, names=('a', 'b', 'c')) t2 = table.remove_column(0) - t2._validate() + t2.validate() expected = pa.Table.from_arrays(data[1:], names=('b', 'c')) assert t2.equals(expected) @@ -633,11 +632,11 @@ def test_table_remove_column_empty(): table = pa.Table.from_arrays(data, names=['a']) t2 = table.remove_column(0) - t2._validate() + t2.validate() assert len(t2) == len(table) t3 = t2.add_column(0, table.field(0), table[0]) - t3._validate() + t3.validate() assert t3.equals(table) @@ -651,7 +650,7 @@ def test_table_rename_columns(): assert table.column_names == ['a', 'b', 'c'] t2 = table.rename_columns(['eh', 'bee', 'sea']) - t2._validate() + t2.validate() assert t2.column_names == ['eh', 'bee', 'sea'] expected = pa.Table.from_arrays(data, names=['eh', 'bee', 'sea']) @@ -668,7 +667,7 @@ def test_table_flatten(): table = pa.Table.from_arrays([a, b, c], names=['a', 'b', 'c']) t2 = table.flatten() - t2._validate() + t2.validate() expected = pa.Table.from_arrays([ pa.array([1, 3], type=pa.int16()), pa.array([2.5, 4.5], type=pa.float32()), @@ -685,7 +684,7 @@ def test_table_combine_chunks(): names=['f1', 'f2']) table = pa.Table.from_batches([batch1, batch2]) combined = table.combine_chunks() - combined._validate() + combined.validate() assert combined.equals(table) for c in combined.columns: assert c.num_chunks == 1 @@ -707,7 +706,7 @@ def test_concat_tables(): names=('a', 'b')) result = pa.concat_tables([t1, t2]) - result._validate() + result.validate() assert len(result) == 10 expected = pa.Table.from_arrays([pa.array(x + y) From 7237e695838d7b913b825ff099c00ced86445924 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 2 Aug 2019 14:45:10 -0500 Subject: [PATCH 3/7] Fix C++ and Python unit tests --- cpp/src/parquet/arrow/reader.cc | 2 +- python/pyarrow/parquet.py | 7 ++----- python/pyarrow/table.pxi | 25 +++++++++++++++---------- python/pyarrow/tests/test_table.py | 12 +++++++----- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 370616339d6..8f59694c51c 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -857,7 +857,7 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, std::unique_ptr builder; RETURN_NOT_OK(FileReaderBuilder::Open(file, ::parquet::default_reader_properties(), nullptr, &builder)); - return builder->Build(pool, default_arrow_reader_properties(), reader); + return builder->Build(pool, properties, reader); } } // namespace arrow diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index ad837c0ee7d..5747a8360be 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -960,16 +960,13 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, if self.common_metadata_path is not None: with self.fs.open(self.common_metadata_path) as f: - self.common_metadata = (ParquetFile(f, memory_map=memory_map) - .metadata) + self.common_metadata = read_metadata(f, memory_map=memory_map) else: self.common_metadata = None if metadata is None and self.metadata_path is not None: with self.fs.open(self.metadata_path) as f: - self.metadata = ParquetFile( - f, memory_map=memory_map, - read_dictionary=read_dictionary).metadata + self.metadata = read_metadata(f, memory_map=memory_map) else: self.metadata = metadata diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 01fa03c6563..f103d262798 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -383,7 +383,7 @@ cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema): if K == 0: schema.reset(new CSchema(c_fields, c_meta)) - return + return arrays c_fields.resize(K) @@ -993,8 +993,9 @@ cdef class Table(_PandasConvertible): int i, K = len(arrays) if schema is None: - arrays = _schema_from_arrays(arrays, names, metadata, &c_schema) - elif schema is not None: + converted_arrays = _schema_from_arrays(arrays, names, metadata, + &c_schema) + else: if names is not None: raise ValueError('Cannot pass both schema and names') if metadata is not None: @@ -1005,20 +1006,24 @@ cdef class Table(_PandasConvertible): raise ValueError('Schema and number of arrays unequal') c_schema = cy_schema.sp_schema + converted_arrays = [] + for i, item in enumerate(arrays): + if not isinstance(item, (Array, ChunkedArray)): + item = array(item, type=schema[i].type) + converted_arrays.append(item) columns.reserve(K) - - for i in range(K): - if isinstance(arrays[i], Array): + for item in converted_arrays: + if isinstance(item, Array): columns.push_back( make_shared[CChunkedArray]( - ( arrays[i]).sp_array + ( item).sp_array ) ) - elif isinstance(arrays[i], ChunkedArray): - columns.push_back(( arrays[i]).sp_chunked_array) + elif isinstance(item, ChunkedArray): + columns.push_back(( item).sp_chunked_array) else: - raise TypeError(type(arrays[i])) + raise TypeError(type(item)) result = pyarrow_wrap_table(CTable.Make(c_schema, columns)) result.validate() diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 24ffd859bc4..8014c19301d 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -503,21 +503,23 @@ def test_table_from_arrays_invalid_names(): pa.Table.from_arrays(data, names=['a']) -def test_table_from_lists_raises(): +def test_table_from_lists(): data = [ list(range(5)), [-10, -5, 0, 5, 10] ] - with pytest.raises(TypeError): - pa.table(data, names=['a', 'b']) + result = pa.table(data, names=['a', 'b']) + expected = pa.Table.from_arrays(data, names=['a', 'b']) + assert result.equals(expected) schema = pa.schema([ pa.field('a', pa.uint16()), pa.field('b', pa.int64()) ]) - with pytest.raises(TypeError): - pa.table(data, schema=schema) + result = pa.table(data, schema=schema) + expected = pa.Table.from_arrays(data, schema=schema) + assert result.equals(expected) def test_table_pickle(): From 8e2b70b1ad86c2b9c95353dbc415fd825f8e92d0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 2 Aug 2019 14:53:48 -0500 Subject: [PATCH 4/7] Expand read_dictionary with ParquetDataset test for multiple files --- python/pyarrow/tests/test_parquet.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 236732edceb..4c0ba6ed0fc 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2884,14 +2884,24 @@ def test_direct_read_dictionary(): def test_dataset_read_dictionary(tempdir): path = tempdir / "ARROW-3325-dataset" - data = [ - [tm.rands(10) for i in range(5)] * 10, - ] - table = pa.table(data, names=['f0']) - pq.write_to_dataset(table, root_path=str(path)) + t1 = pa.table([[tm.rands(10) for i in range(5)] * 10], names=['f0']) + t2 = pa.table([[tm.rands(10) for i in range(5)] * 10], names=['f0']) + pq.write_to_dataset(t1, root_path=str(path)) + pq.write_to_dataset(t2, root_path=str(path)) + result = pq.ParquetDataset(path, read_dictionary=['f0']).read() - expected = pa.table([table[0].dictionary_encode()], names=['f0']) - assert result.equals(expected) + + # The order of the chunks is non-deterministic + ex_chunks = [t1[0].chunk(0).dictionary_encode(), + t2[0].chunk(0).dictionary_encode()] + + assert result[0].num_chunks == 2 + c0, c1 = result[0].chunk(0), result[0].chunk(1) + if c0.equals(ex_chunks[0]): + assert c1.equals(ex_chunks[1]) + else: + assert c0.equals(ex_chunks[1]) + assert c1.equals(ex_chunks[0]) def test_direct_read_dictionary_subfield(): From 0f450d53e522c17272579533ce150db2ca0bb867 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 3 Aug 2019 14:14:17 -0500 Subject: [PATCH 5/7] Clean up FileReaderBuilder. Add simle Python docs --- .../parquet/arrow/arrow-reader-writer-test.cc | 37 +++++++------- cpp/src/parquet/arrow/reader.cc | 48 +++++++++++-------- cpp/src/parquet/arrow/reader.h | 18 +++---- docs/source/python/parquet.rst | 15 +++++- python/pyarrow/_parquet.pxd | 11 ++--- python/pyarrow/_parquet.pyx | 12 ++--- 6 files changed, 78 insertions(+), 63 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 2e57ab8cd44..cb2beb2b3a5 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -383,8 +383,7 @@ void DoConfiguredRoundtrip( std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); ASSERT_OK_NO_THROW(reader->ReadTable(out)); } @@ -421,8 +420,7 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, bool use_threads, std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); reader->set_use_threads(use_threads); @@ -499,8 +497,7 @@ class TestParquetIO : public ::testing::Test { std::shared_ptr buffer; ASSERT_OK_NO_THROW(sink_->Finish(&buffer)); ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, out)); + ::arrow::default_memory_pool(), out)); } void ReadSingleColumnFile(std::unique_ptr file_reader, @@ -1869,8 +1866,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) { std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); ASSERT_EQ(2, reader->num_row_groups()); @@ -1907,8 +1903,9 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) { properties.set_batch_size(100); std::unique_ptr reader; - ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), properties, &reader)); + FileReaderBuilder builder; + ASSERT_OK(builder.Open(std::make_shared(buffer))); + ASSERT_OK(builder.properties(properties)->Build(&reader)); std::shared_ptr<::arrow::RecordBatchReader> rb_reader; ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader)); @@ -1938,8 +1935,7 @@ TEST(TestArrowReadWrite, ScanContents) { std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); int64_t num_rows_returned = 0; ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned)); @@ -1994,8 +1990,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) { std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); // Read everything std::shared_ptr
result; @@ -2004,8 +1999,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) { // Read 1 record at a time ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), &reader)); std::unique_ptr col_reader; ASSERT_OK(reader->GetColumn(0, &col_reader)); @@ -2216,9 +2210,8 @@ class TestNestedSchemaRead : public ::testing::TestWithParam { void InitReader() { std::shared_ptr buffer; ASSERT_OK_NO_THROW(nested_parquet_->Finish(&buffer)); - ASSERT_OK_NO_THROW( - OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader_)); + ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), + ::arrow::default_memory_pool(), &reader_)); } void InitNewParquetFile(const std::shared_ptr& schema, int num_rows) { @@ -2780,8 +2773,10 @@ class TestArrowReadDictionary : public ::testing::TestWithParam { void CheckReadWholeFile(const Table& expected) { std::unique_ptr reader; - ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer_), - ::arrow::default_memory_pool(), properties_, &reader)); + + FileReaderBuilder builder; + ASSERT_OK_NO_THROW(builder.Open(std::make_shared(buffer_))); + ASSERT_OK(builder.properties(properties_)->Build(&reader)); std::shared_ptr
actual; ASSERT_OK_NO_THROW(reader->ReadTable(&actual)); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 8f59694c51c..931fd19a32c 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -814,30 +814,37 @@ Status FileReader::Make(::arrow::MemoryPool* pool, return Make(pool, std::move(reader), default_arrow_reader_properties(), out); } -FileReaderBuilder::FileReaderBuilder(std::unique_ptr raw_reader) - : raw_reader_(std::move(raw_reader)) {} +FileReaderBuilder::FileReaderBuilder() + : pool_(::arrow::default_memory_pool()), + properties_(default_arrow_reader_properties()) {} Status FileReaderBuilder::Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, const ReaderProperties& properties, - const std::shared_ptr& metadata, - std::unique_ptr* builder) { - std::unique_ptr raw_reader; - PARQUET_CATCH_NOT_OK(raw_reader = ParquetReader::Open(file, properties, metadata)); - builder->reset(new FileReaderBuilder(std::move(raw_reader))); + const std::shared_ptr& metadata) { + PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(file, properties, metadata)); return Status::OK(); } -Status FileReaderBuilder::Build(MemoryPool* pool, const ArrowReaderProperties& properties, - std::unique_ptr* out) { - return FileReader::Make(pool, std::move(raw_reader_), properties, out); +FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) { + pool_ = pool; + return this; +} + +FileReaderBuilder* FileReaderBuilder::properties( + const ArrowReaderProperties& arg_properties) { + properties_ = arg_properties; + return this; +} + +Status FileReaderBuilder::Build(std::unique_ptr* out) { + return FileReader::Make(pool_, std::move(raw_reader_), properties_, out); } Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, MemoryPool* pool, std::unique_ptr* reader) { - std::unique_ptr builder; - RETURN_NOT_OK(FileReaderBuilder::Open(file, ::parquet::default_reader_properties(), - nullptr, &builder)); - return builder->Build(pool, default_arrow_reader_properties(), reader); + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(file)); + return builder.memory_pool(pool)->Build(reader); } Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, @@ -845,19 +852,18 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, const std::shared_ptr& metadata, std::unique_ptr* reader) { // Deprecated since 0.15.0 - std::unique_ptr builder; - RETURN_NOT_OK(FileReaderBuilder::Open(file, props, metadata, &builder)); - return builder->Build(pool, default_arrow_reader_properties(), reader); + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(file, props, metadata)); + return builder.memory_pool(pool)->Build(reader); } Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, MemoryPool* pool, const ArrowReaderProperties& properties, std::unique_ptr* reader) { // Deprecated since 0.15.0 - std::unique_ptr builder; - RETURN_NOT_OK(FileReaderBuilder::Open(file, ::parquet::default_reader_properties(), - nullptr, &builder)); - return builder->Build(pool, properties, reader); + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(file)); + return builder.memory_pool(pool)->properties(properties)->Build(reader); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 39f00cf8728..2d65a7134b5 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -24,6 +24,7 @@ #include #include "parquet/platform.h" +#include "parquet/properties.h" namespace arrow { @@ -290,20 +291,21 @@ class PARQUET_EXPORT ColumnReader { /// either with std::move or C++ exceptions class FileReaderBuilder { public: - explicit FileReaderBuilder(std::unique_ptr raw_reader); + FileReaderBuilder(); - static ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, - const ReaderProperties& properties, - const std::shared_ptr& metadata, - std::unique_ptr* builder); + ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + const ReaderProperties& properties = default_reader_properties(), + const std::shared_ptr& metadata = NULLPTR); ParquetFileReader* raw_reader() { return raw_reader_.get(); } - ::arrow::Status Build(::arrow::MemoryPool* allocator, - const ArrowReaderProperties& arrow_properties, - std::unique_ptr* out); + FileReaderBuilder* memory_pool(::arrow::MemoryPool* pool); + FileReaderBuilder* properties(const ArrowReaderProperties& arg_properties); + ::arrow::Status Build(std::unique_ptr* out); private: + ::arrow::MemoryPool* pool_; + ArrowReaderProperties properties_; std::unique_ptr raw_reader_; }; diff --git a/docs/source/python/parquet.rst b/docs/source/python/parquet.rst index 5cd1248fb54..173919d9a26 100644 --- a/docs/source/python/parquet.rst +++ b/docs/source/python/parquet.rst @@ -210,6 +210,19 @@ Alternatively python ``with`` syntax can also be use: Data Type Handling ------------------ +Reading types as DictionaryArray +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``read_dictionary`` option in ``read_table`` and ``ParquetDataset`` will +cause columns to be read as ``DictionaryArray``, which will become +``pandas.Categorical`` when converted to pandas. This option is only valid for +string and binary column types, and it can yield significantly lower memory use +and improved performance for columns with many repeated string values. + +.. code-block:: python + + pq.read_table(table, where, read_dictionary=['binary_c0', 'stringb_c2']) + Storing timestamps ~~~~~~~~~~~~~~~~~~ @@ -305,7 +318,7 @@ A dataset partitioned by year and month may look like on disk: ... Writing to Partitioned Datasets ------------------------------------------------- +------------------------------- You can write a partitioned dataset for any ``pyarrow`` file system that is a file-store (e.g. local, HDFS, S3). The default behaviour when no filesystem is diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 659f65ce575..32ee1ad1a0e 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -381,16 +381,15 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: void set_use_threads(c_bool use_threads) cdef cppclass FileReaderBuilder: - @staticmethod + FileReaderBuilder() CStatus Open(const shared_ptr[RandomAccessFile]& file, const ReaderProperties& properties, - const shared_ptr[CFileMetaData]& metadata, - unique_ptr[FileReaderBuilder]* builder) + const shared_ptr[CFileMetaData]& metadata) ParquetFileReader* raw_reader() - CStatus Build(CMemoryPool* pool, - const ArrowReaderProperties& arrow_properties, - unique_ptr[FileReader]* out) + FileReaderBuilder* memory_pool(CMemoryPool*) + FileReaderBuilder* properties(const ArrowReaderProperties&) + CStatus Build(unique_ptr[FileReader]* out) CStatus FromParquetSchema( const SchemaDescriptor* parquet_schema, diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 52379ff63a4..d51e7fb3c6c 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -998,7 +998,7 @@ cdef class ParquetReader: ArrowReaderProperties arrow_props = ( default_arrow_reader_properties()) c_string path - unique_ptr[FileReaderBuilder] builder + FileReaderBuilder builder if metadata is not None: c_metadata = metadata.sp_metadata @@ -1007,12 +1007,11 @@ cdef class ParquetReader: get_reader(source, use_memory_map, &rd_handle) with nogil: - check_status(FileReaderBuilder.Open(rd_handle, properties, - c_metadata, &builder)) + check_status(builder.Open(rd_handle, properties, c_metadata)) # Set up metadata with nogil: - c_metadata = builder.get().raw_reader().metadata() + c_metadata = builder.raw_reader().metadata() self._metadata = result = FileMetaData() result.init(c_metadata) @@ -1020,8 +1019,9 @@ cdef class ParquetReader: self._set_read_dictionary(read_dictionary, &arrow_props) with nogil: - check_status(builder.get().Build(self.pool, arrow_props, - &self.reader)) + check_status(builder.memory_pool(self.pool) + .properties(arrow_props) + .Build(&self.reader)) cdef _set_read_dictionary(self, read_dictionary, ArrowReaderProperties* props): From ee73d7b41b34878cd539a0a4ddaa95fd1fa7bdba Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 5 Aug 2019 10:28:45 -0500 Subject: [PATCH 6/7] Add missing PARQUET_EXPORT --- cpp/src/parquet/arrow/reader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 2d65a7134b5..d0ce68a57e4 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -289,7 +289,7 @@ class PARQUET_EXPORT ColumnReader { /// \brief Experimental helper class for bindings (like Python) that struggle /// either with std::move or C++ exceptions -class FileReaderBuilder { +class PARQUET_EXPORT FileReaderBuilder { public: FileReaderBuilder(); From 2ca3881492cf46cfb4cd6d65a3ae4b0fe77454bf Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 5 Aug 2019 13:10:29 -0500 Subject: [PATCH 7/7] Improve docstring for read_dictionary parameter, add to ParquetDataset --- python/pyarrow/parquet.py | 112 ++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 52 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 5747a8360be..eb00a35a327 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -891,49 +891,6 @@ def _open_dataset_file(dataset, path, meta=None): class ParquetDataset(object): - """ - Encapsulates details of reading a complete Parquet dataset possibly - consisting of multiple files and partitions in subdirectories - - Parameters - ---------- - path_or_paths : str or List[str] - A directory name, single file name, or list of file names - filesystem : FileSystem, default None - If nothing passed, paths assumed to be found in the local on-disk - filesystem - metadata : pyarrow.parquet.FileMetaData - Use metadata obtained elsewhere to validate file schemas - schema : pyarrow.parquet.Schema - Use schema obtained elsewhere to validate file schemas. Alternative to - metadata parameter - split_row_groups : boolean, default False - Divide files into pieces for each row group in the file - validate_schema : boolean, default True - Check that individual file schemas are all the same / compatible - filters : List[Tuple] or List[List[Tuple]] or None (default) - List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This - implements partition-level (hive) filtering only, i.e., to prevent the - loading of some files of the dataset. - - Predicates are expressed in disjunctive normal form (DNF). This means - that the innermost tuple describe a single column predicate. These - inner predicate make are all combined with a conjunction (AND) into a - larger predicate. The most outer list then combines all filters - with a disjunction (OR). By this, we should be able to express all - kinds of filters that are possible using boolean logic. - - This function also supports passing in as List[Tuple]. These predicates - are evaluated as a conjunction. To express OR in predictates, one must - use the (preferred) List[List[Tuple]] notation. - metadata_nthreads: int, default 1 - How many threads to allow the thread pool which is used to read the - dataset metadata. Increasing this is helpful to read partitioned - datasets. - memory_map : boolean, default True - If the source is a file path, use a memory map to read each file in the - dataset if possible, which can improve performance in some environments - """ def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, @@ -1148,6 +1105,62 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, return pieces, partitions, common_metadata_path, metadata_path +_read_docstring_common = """\ +read_dictionary : list, default None + List of names or column paths (for nested types) to read directly + as DictionaryArray. Only supported for BYTE_ARRAY storage. To read + a flat column as dictionary-encoded pass the column name. For + nested types, you must pass the full column "path", which could be + something like level1.level2.list.item. Refer to the Parquet + file's schema to obtain the paths. +memory_map : boolean, default True + If the source is a file path, use a memory map to read file, which can + improve performance in some environments""" + + +ParquetDataset.__doc__ = """ +Encapsulates details of reading a complete Parquet dataset possibly +consisting of multiple files and partitions in subdirectories + +Parameters +---------- +path_or_paths : str or List[str] + A directory name, single file name, or list of file names +filesystem : FileSystem, default None + If nothing passed, paths assumed to be found in the local on-disk + filesystem +metadata : pyarrow.parquet.FileMetaData + Use metadata obtained elsewhere to validate file schemas +schema : pyarrow.parquet.Schema + Use schema obtained elsewhere to validate file schemas. Alternative to + metadata parameter +split_row_groups : boolean, default False + Divide files into pieces for each row group in the file +validate_schema : boolean, default True + Check that individual file schemas are all the same / compatible +filters : List[Tuple] or List[List[Tuple]] or None (default) + List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This + implements partition-level (hive) filtering only, i.e., to prevent the + loading of some files of the dataset. + + Predicates are expressed in disjunctive normal form (DNF). This means + that the innermost tuple describe a single column predicate. These + inner predicate make are all combined with a conjunction (AND) into a + larger predicate. The most outer list then combines all filters + with a disjunction (OR). By this, we should be able to express all + kinds of filters that are possible using boolean logic. + + This function also supports passing in as List[Tuple]. These predicates + are evaluated as a conjunction. To express OR in predictates, one must + use the (preferred) List[List[Tuple]] notation. +metadata_nthreads: int, default 1 + How many threads to allow the thread pool which is used to read the + dataset metadata. Increasing this is helpful to read partitioned + datasets. +{0} +""".format(_read_docstring_common) + + _read_table_docstring = """ {0} @@ -1165,12 +1178,6 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, Perform multi-threaded column reads metadata : FileMetaData If separately computed -memory_map : boolean, default True - If the source is a file path, use a memory map to read file, which can - improve performance in some environments -read_dictionary : list, default None - List of column paths to read directly as DictionaryArray. Only supported - for BYTE_ARRAY storage {1} filters : List[Tuple] or List[List[Tuple]] or None (default) List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This @@ -1201,9 +1208,10 @@ def read_table(source, columns=None, use_threads=True, metadata=None, read_table.__doc__ = _read_table_docstring.format( 'Read a Table from Parquet format', - """use_pandas_metadata : boolean, default False + "\n".join((_read_docstring_common, + """use_pandas_metadata : boolean, default False If True and file has custom pandas schema metadata, ensure that - index columns are also loaded""", + index columns are also loaded""")), """pyarrow.Table Content of the file as a table (of columns)""") @@ -1220,7 +1228,7 @@ def read_pandas(source, columns=None, use_threads=True, memory_map=True, read_pandas.__doc__ = _read_table_docstring.format( 'Read a Table from Parquet format, also reading DataFrame\n' 'index values if known in the file metadata', - '', + _read_docstring_common, """pyarrow.Table Content of the file as a Table of Columns, including DataFrame indexes as columns""")