From 165298a8d3a228b324ccf56105fc39f1ebedcf58 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 23 Mar 2021 16:53:45 -0400 Subject: [PATCH 1/9] ARROW-11972: [C++][Dataset] Add IpcFragmentScanOptions --- cpp/src/arrow/dataset/file_ipc.cc | 30 +++++++++++++++++++------- cpp/src/arrow/dataset/file_ipc.h | 14 +++++++++++- cpp/src/arrow/dataset/file_ipc_test.cc | 26 ++++++++++++++++++++++ cpp/src/arrow/dataset/test_util.h | 3 ++- 4 files changed, 63 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index a81e8b74e86..178fc9766cb 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -81,15 +81,27 @@ class IpcScanTask : public ScanTask { Result Execute() override { struct Impl { - static Result Make( - const FileSource& source, std::vector materialized_fields, - MemoryPool* pool) { + static Result Make(const FileSource& source, + FileFormat* format, + ScanOptions* scan_options) { ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); - auto options = default_read_options(); - options.memory_pool = pool; - ARROW_ASSIGN_OR_RAISE(options.included_fields, - GetIncludedFields(*reader->schema(), materialized_fields)); + ARROW_ASSIGN_OR_RAISE( + auto ipc_scan_options, + GetFragmentScanOptions( + kIpcTypeName, scan_options, format->default_fragment_scan_options)); + auto options = ipc_scan_options->options ? *ipc_scan_options->options + : default_read_options(); + options.memory_pool = scan_options->pool; + options.use_threads = false; + if (!options.included_fields.empty()) { + // Cannot set them ehre + return Status::Invalid( + "Cannot set included_fields in scan options for IPC fragments"); + } + ARROW_ASSIGN_OR_RAISE( + options.included_fields, + GetIncludedFields(*reader->schema(), scan_options->MaterializedFields())); ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options)); return RecordBatchIterator(Impl{std::move(reader), 0}); @@ -107,7 +119,9 @@ class IpcScanTask : public ScanTask { int i_; }; - return Impl::Make(source_, options_->MaterializedFields(), options_->pool); + return Impl::Make( + source_, internal::checked_pointer_cast(fragment_)->format().get(), + options_.get()); } private: diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index cbfb6b858cd..a7bcd04a9d2 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -31,10 +31,12 @@ namespace arrow { namespace dataset { +constexpr char kIpcTypeName[] = "ipc"; + /// \brief A FileFormat implementation that reads from and writes to Ipc files class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { public: - std::string type_name() const override { return "ipc"; } + std::string type_name() const override { return kIpcTypeName; } bool Equals(const FileFormat& other) const override { return type_name() == other.type_name(); @@ -59,6 +61,16 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override; }; +/// \brief Per-scan options for IPC fragments +class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions { + public: + std::string type_name() const override { return kIpcTypeName; } + + /// Options passed to the IPC file reader. + /// included_fields, memory_pool, and use_threads are ignored. + std::shared_ptr options; +}; + class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions { public: /// Options passed to ipc::MakeFileWriter. use_threads is ignored diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 8a5fd024575..97c45aea863 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -134,6 +134,32 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { ASSERT_EQ(row_count, kNumRows); } +TEST_F(TestIpcFileFormat, FragmentScanOptions) { + auto reader = GetRecordBatchReader( + schema({field("list", list(float64())), field("f64", float64())})); + auto source = GetFileSource(reader.get()); + + SetSchema(reader->schema()->fields()); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + // Set scan options that ensure reading fails + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->options = std::make_shared(); + fragment_scan_options->options->max_recursion_depth = 0; + opts_->fragment_scan_options = fragment_scan_options; + ASSERT_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); + ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next()); + ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute()); + ASSERT_RAISES(Invalid, batches.Next()); + + // Ensure included_fields cannot be set + fragment_scan_options->options = std::make_shared(); + fragment_scan_options->options->included_fields = {1}; + ASSERT_OK_AND_ASSIGN(scan_tasks, fragment->Scan(opts_)); + ASSERT_OK_AND_ASSIGN(scan_task, scan_tasks.Next()); + ASSERT_RAISES(Invalid, scan_task->Execute()); +} + TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 86bb14b038d..1667d964d12 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -42,6 +42,7 @@ #include "arrow/table.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" #include "arrow/util/io_util.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" @@ -102,7 +103,7 @@ std::unique_ptr> MakeGeneratedRecordBatch( std::unique_ptr MakeGeneratedRecordBatch( std::shared_ptr schema, int64_t batch_size, int64_t batch_repetitions) { - auto batch = ConstantArrayGenerator::Zeroes(batch_size, schema); + auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0); int64_t i = 0; return MakeGeneratedRecordBatch( schema, [batch, i, batch_repetitions](std::shared_ptr* out) mutable { From ee457437c66214238b46a111f8063963539abcd0 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 23 Mar 2021 17:58:07 -0400 Subject: [PATCH 2/9] ARROW-11972: [C++][Dataset] Add ParquetFragmentScanOptions --- cpp/src/arrow/dataset/file_parquet.cc | 43 +++++++++------ cpp/src/arrow/dataset/file_parquet.h | 62 +++++++++++++++------- cpp/src/arrow/dataset/file_parquet_test.cc | 5 +- 3 files changed, 75 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index d255787d55f..998200a295e 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -128,14 +128,15 @@ class ParquetScanTask : public ScanTask { }; static parquet::ReaderProperties MakeReaderProperties( - const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) { + const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options, + MemoryPool* pool = default_memory_pool()) { parquet::ReaderProperties properties(pool); - if (format.reader_options.use_buffered_stream) { + if (parquet_scan_options->use_buffered_stream) { properties.enable_buffered_stream(); } else { properties.disable_buffered_stream(); } - properties.set_buffer_size(format.reader_options.buffer_size); + properties.set_buffer_size(parquet_scan_options->buffer_size); properties.file_decryption_properties(format.reader_options.file_decryption_properties); return properties; } @@ -249,15 +250,15 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const { checked_cast(other).reader_options; // FIXME implement comparison for decryption options - // FIXME extract these to scan time options so comparison is unnecessary - return reader_options.use_buffered_stream == other_reader_options.use_buffered_stream && - reader_options.buffer_size == other_reader_options.buffer_size && - reader_options.dict_columns == other_reader_options.dict_columns; + return reader_options.dict_columns == other_reader_options.dict_columns; } ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) { - reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled(); - reader_options.buffer_size = reader_properties.buffer_size(); + auto parquet_scan_options = std::make_shared(); + parquet_scan_options->use_buffered_stream = + reader_properties.is_buffered_stream_enabled(); + parquet_scan_options->buffer_size = reader_properties.buffer_size(); + default_fragment_scan_options = std::move(parquet_scan_options); reader_options.file_decryption_properties = reader_properties.file_decryption_properties(); } @@ -265,8 +266,11 @@ ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_pro Result ParquetFileFormat::IsSupported(const FileSource& source) const { try { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - auto reader = - parquet::ParquetFileReader::Open(std::move(input), MakeReaderProperties(*this)); + ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options, + GetFragmentScanOptions( + kParquetTypeName, nullptr, default_fragment_scan_options)); + auto reader = parquet::ParquetFileReader::Open( + std::move(input), MakeReaderProperties(*this, parquet_scan_options.get())); std::shared_ptr metadata = reader->metadata(); return metadata != nullptr && metadata->can_decompress(); } catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) { @@ -290,8 +294,11 @@ Result> ParquetFileFormat::Inspect( Result> ParquetFileFormat::GetReader( const FileSource& source, ScanOptions* options) const { + ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options, + GetFragmentScanOptions( + kParquetTypeName, options, default_fragment_scan_options)); MemoryPool* pool = options ? options->pool : default_memory_pool(); - auto properties = MakeReaderProperties(*this, pool); + auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool); ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); std::unique_ptr reader; @@ -310,7 +317,8 @@ Result> ParquetFileFormat::GetReader } if (options && !options->use_threads) { - arrow_properties.set_use_threads(reader_options.enable_parallel_column_conversion); + arrow_properties.set_use_threads( + parquet_scan_options->enable_parallel_column_conversion); } std::unique_ptr arrow_reader; @@ -356,15 +364,20 @@ Result ParquetFileFormat::ScanFile( auto column_projection = InferColumnProjection(*reader, *options); ScanTaskVector tasks(row_groups.size()); + ARROW_ASSIGN_OR_RAISE( + auto parquet_scan_options, + GetFragmentScanOptions(kParquetTypeName, options.get(), + default_fragment_scan_options)); std::shared_ptr pre_buffer_once = nullptr; - if (reader_options.pre_buffer) { + if (parquet_scan_options->pre_buffer) { pre_buffer_once = std::make_shared(); } for (size_t i = 0; i < row_groups.size(); ++i) { tasks[i] = std::make_shared( row_groups[i], column_projection, reader, pre_buffer_once, row_groups, - reader_options.io_context, reader_options.cache_options, options, fragment); + parquet_scan_options->io_context, parquet_scan_options->cache_options, options, + fragment); } return MakeVectorIterator(std::move(tasks)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 869857e4d34..3ecc06108d4 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -57,6 +57,8 @@ struct SchemaManifest; namespace arrow { namespace dataset { +constexpr char kParquetTypeName[] = "parquet"; + /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: @@ -66,7 +68,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// memory_pool will be ignored. explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties); - std::string type_name() const override { return "parquet"; } + std::string type_name() const override { return kParquetTypeName; } bool splittable() const override { return true; } @@ -76,35 +78,20 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \defgroup parquet-file-format-reader-properties properties which correspond to /// members of parquet::ReaderProperties. /// - /// We don't embed parquet::ReaderProperties directly because we get memory_pool from - /// ScanOptions at scan time and provide differing defaults. - /// /// @{ - bool use_buffered_stream = false; - int64_t buffer_size = 1 << 13; std::shared_ptr file_decryption_properties; /// @} /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond /// to members of parquet::ArrowReaderProperties. /// - /// We don't embed parquet::ReaderProperties directly because we get batch_size from - /// ScanOptions at scan time, and we will never pass use_threads == true (since we - /// defer parallelization of the scan). Additionally column names (rather than - /// indices) are used to indicate dictionary columns. + /// We don't embed parquet::ReaderProperties directly because column names (rather + /// than indices) are used to indicate dictionary columns, and other options are + /// deferred to scan time. /// /// @{ std::unordered_set dict_columns; - bool pre_buffer = false; - arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); - arrow::io::IOContext io_context; /// @} - - /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a - /// scan is already parallelized across input files to avoid thread contention. This - /// option will be removed after support is added for simultaneous parallelization - /// across files and columns. - bool enable_parallel_column_conversion = false; } reader_options; Result IsSupported(const FileSource& source) const override; @@ -206,6 +193,43 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { friend class ParquetDatasetFactory; }; +/// \brief Per-scan options for Parquet fragments +class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions { + public: + std::string type_name() const override { return kParquetTypeName; } + + /// \defgroup parquet-file-format-reader-properties properties which correspond to + /// members of parquet::ReaderProperties. + /// + /// We don't embed parquet::ReaderProperties directly because we get memory_pool from + /// ScanOptions at scan time and provide differing defaults. + /// + /// @{ + bool use_buffered_stream = false; + int64_t buffer_size = 1 << 13; + /// @} + + /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond + /// to members of parquet::ArrowReaderProperties. + /// + /// We don't embed parquet::ReaderProperties directly because we get batch_size from + /// ScanOptions at scan time, and we will never pass use_threads == true (since we + /// defer parallelization of the scan). Additionally column names (rather than + /// indices) are used to indicate dictionary columns. + /// + /// @{ + bool pre_buffer = false; + arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); + arrow::io::IOContext io_context; + /// @} + + /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a + /// scan is already parallelized across input files to avoid thread contention. This + /// option will be removed after support is added for simultaneous parallelization + /// across files and columns. + bool enable_parallel_column_conversion = false; +}; + class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions { public: std::shared_ptr writer_properties; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index cf14a2b7caf..93f9d869420 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -272,8 +272,10 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) { SetSchema(reader->schema()->fields()); SetFilter(literal(true)); - format_->reader_options.pre_buffer = true; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->pre_buffer = true; + opts_->fragment_scan_options = fragment_scan_options; ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); int64_t task_count = 0; @@ -636,6 +638,7 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) { options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder() .coerce_timestamps(coerce_timestamps_to) + ->allow_truncated_timestamps() ->build(); EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options)); From 921029df1987c8ae76ad1cd03c0de9ffaee927de Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 24 Mar 2021 10:18:59 -0400 Subject: [PATCH 3/9] ARROW-11972: [Python][Dataset] Add ParquetFragmentScanOptions --- python/pyarrow/_dataset.pyx | 249 ++++++++++++++----- python/pyarrow/dataset.py | 1 + python/pyarrow/includes/libarrow_dataset.pxd | 11 +- python/pyarrow/parquet.py | 4 +- python/pyarrow/tests/test_dataset.py | 49 ++-- 5 files changed, 226 insertions(+), 88 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 17eedb38c7f..5ea92e9a914 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -23,6 +23,7 @@ from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE from cython.operator cimport dereference as deref import os +import warnings import pyarrow as pa from pyarrow.lib cimport * @@ -1097,10 +1098,14 @@ cdef class FragmentScanOptions(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CFragmentScanOptions]& sp): + if not sp: + return None + type_name = frombytes(sp.get().type_name()) classes = { 'csv': CsvFragmentScanOptions, + 'parquet': ParquetFragmentScanOptions, } class_ = classes.get(type_name, None) @@ -1271,56 +1276,20 @@ cdef class ParquetReadOptions(_Weakrefable): Parameters ---------- - use_buffered_stream : bool, default False - Read files through buffered input streams rather than loading entire - row groups at once. This may be enabled to reduce memory overhead. - Disabled by default. - buffer_size : int, default 8192 - Size of buffered stream, if enabled. Default is 8KB. dictionary_columns : list of string, default None Names of columns which should be dictionary encoded as they are read. - pre_buffer : bool, default False - If enabled, pre-buffer the raw Parquet data instead of issuing one - read per column chunk. This can improve performance on high-latency - filesystems. - enable_parallel_column_conversion : bool, default False - EXPERIMENTAL: Parallelize conversion across columns. This option is - ignored if a scan is already parallelized across input files to avoid - thread contention. This option will be removed after support is added - for simultaneous parallelization across files and columns. """ cdef public: - bint use_buffered_stream - uint32_t buffer_size set dictionary_columns - bint pre_buffer - bint enable_parallel_column_conversion - def __init__(self, bint use_buffered_stream=False, - buffer_size=8192, - dictionary_columns=None, - bint pre_buffer=False, - bint enable_parallel_column_conversion=False): - self.use_buffered_stream = use_buffered_stream - if buffer_size <= 0: - raise ValueError("Buffer size must be larger than zero") - self.buffer_size = buffer_size + # Also see _PARQUET_READ_OPTIONS + def __init__(self, dictionary_columns=None): self.dictionary_columns = set(dictionary_columns or set()) - self.pre_buffer = pre_buffer - self.enable_parallel_column_conversion = \ - enable_parallel_column_conversion def equals(self, ParquetReadOptions other): - return ( - self.use_buffered_stream == other.use_buffered_stream and - self.buffer_size == other.buffer_size and - self.dictionary_columns == other.dictionary_columns and - self.pre_buffer == other.pre_buffer and - self.enable_parallel_column_conversion == - other.enable_parallel_column_conversion - ) + return self.dictionary_columns == other.dictionary_columns def __eq__(self, other): try: @@ -1328,6 +1297,10 @@ cdef class ParquetReadOptions(_Weakrefable): except TypeError: return False + def __repr__(self): + return (f"") + cdef class ParquetFileWriteOptions(FileWriteOptions): @@ -1409,38 +1382,74 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): self._set_arrow_properties() +cdef set _PARQUET_READ_OPTIONS = {'dictionary_columns'} + + cdef class ParquetFileFormat(FileFormat): cdef: CParquetFileFormat* parquet_format - def __init__(self, read_options=None): + def __init__(self, read_options=None, + default_fragment_scan_options=None, **kwargs): cdef: shared_ptr[CParquetFileFormat] wrapped CParquetFileFormatReaderOptions* options - # Read options + # Read/scan options + read_options_args = {option: kwargs[option] for option in kwargs + if option in _PARQUET_READ_OPTIONS} + scan_args = {option: kwargs[option] for option in kwargs + if option not in _PARQUET_READ_OPTIONS} + if read_options and read_options_args: + duplicates = ', '.join(sorted(read_options_args)) + raise ValueError(f'If `read_options` is given, ' + f'cannot specify {duplicates}') + if default_fragment_scan_options and scan_args: + duplicates = ', '.join(sorted(scan_args)) + raise ValueError(f'If `default_fragment_scan_options` is given, ' + f'cannot specify {duplicates}') if read_options is None: - read_options = ParquetReadOptions() + read_options = ParquetReadOptions(**read_options_args) elif isinstance(read_options, dict): - read_options = ParquetReadOptions(**read_options) + # For backwards compatibility + duplicates = [] + for option, value in read_options.items(): + if option in _PARQUET_READ_OPTIONS: + read_options_args[option] = value + else: + duplicates.append(option) + scan_args[option] = value + if duplicates: + duplicates = ", ".join(duplicates) + warnings.warn(f'The scan options {duplicates} should be ' + 'specified directly as keyword arguments') + read_options = ParquetReadOptions(**read_options_args) elif not isinstance(read_options, ParquetReadOptions): raise TypeError('`read_options` must be either a dictionary or an ' 'instance of ParquetReadOptions') + if default_fragment_scan_options is None: + default_fragment_scan_options = ParquetFragmentScanOptions( + **scan_args) + elif isinstance(default_fragment_scan_options, dict): + default_fragment_scan_options = ParquetFragmentScanOptions( + **default_fragment_scan_options) + elif not isinstance(default_fragment_scan_options, + ParquetFragmentScanOptions): + raise TypeError('`default_fragment_scan_options` must be either a ' + 'dictionary or an instance of ' + 'ParquetFragmentScanOptions') + wrapped = make_shared[CParquetFileFormat]() options = &(wrapped.get().reader_options) - options.use_buffered_stream = read_options.use_buffered_stream - options.buffer_size = read_options.buffer_size - options.pre_buffer = read_options.pre_buffer - options.enable_parallel_column_conversion = \ - read_options.enable_parallel_column_conversion if read_options.dictionary_columns is not None: for column in read_options.dictionary_columns: options.dict_columns.insert(tobytes(column)) self.init( wrapped) + self.default_fragment_scan_options = default_fragment_scan_options cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1451,14 +1460,8 @@ cdef class ParquetFileFormat(FileFormat): cdef CParquetFileFormatReaderOptions* options options = &self.parquet_format.reader_options return ParquetReadOptions( - use_buffered_stream=options.use_buffered_stream, - buffer_size=options.buffer_size, dictionary_columns={frombytes(col) for col in options.dict_columns}, - pre_buffer=options.pre_buffer, - enable_parallel_column_conversion=( - options.enable_parallel_column_conversion - ) ) def make_write_options(self, **kwargs): @@ -1466,13 +1469,25 @@ cdef class ParquetFileFormat(FileFormat): ( opts).update(**kwargs) return opts + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + if options.type_name == 'parquet': + self.parquet_format.default_fragment_scan_options = options.wrapped + else: + super()._set_default_fragment_scan_options(options) + def equals(self, ParquetFileFormat other): return ( - self.read_options.equals(other.read_options) + self.read_options.equals(other.read_options) and + self.default_fragment_scan_options == + other.default_fragment_scan_options ) def __reduce__(self): - return ParquetFileFormat, (self.read_options, ) + return ParquetFileFormat, (self.read_options, + self.default_fragment_scan_options) + + def __repr__(self): + return f"" def make_fragment(self, file, filesystem=None, Expression partition_expression=None, row_groups=None): @@ -1497,6 +1512,100 @@ cdef class ParquetFileFormat(FileFormat): return Fragment.wrap(move(c_fragment)) +cdef class ParquetFragmentScanOptions(FragmentScanOptions): + """Scan-specific options for Parquet fragments. + + Parameters + ---------- + use_buffered_stream : bool, default False + Read files through buffered input streams rather than loading entire + row groups at once. This may be enabled to reduce memory overhead. + Disabled by default. + buffer_size : int, default 8192 + Size of buffered stream, if enabled. Default is 8KB. + pre_buffer : bool, default False + If enabled, pre-buffer the raw Parquet data instead of issuing one + read per column chunk. This can improve performance on high-latency + filesystems. + enable_parallel_column_conversion : bool, default False + EXPERIMENTAL: Parallelize conversion across columns. This option is + ignored if a scan is already parallelized across input files to avoid + thread contention. This option will be removed after support is added + for simultaneous parallelization across files and columns. + """ + + cdef: + CParquetFragmentScanOptions* parquet_options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, bint use_buffered_stream=False, + buffer_size=8192, + bint pre_buffer=False, + bint enable_parallel_column_conversion=False): + self.init(shared_ptr[CFragmentScanOptions]( + new CParquetFragmentScanOptions())) + self.use_buffered_stream = use_buffered_stream + self.buffer_size = buffer_size + self.pre_buffer = pre_buffer + self.enable_parallel_column_conversion = \ + enable_parallel_column_conversion + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + FragmentScanOptions.init(self, sp) + self.parquet_options = sp.get() + + @property + def use_buffered_stream(self): + return self.parquet_options.use_buffered_stream + + @use_buffered_stream.setter + def use_buffered_stream(self, bint use_buffered_stream): + self.parquet_options.use_buffered_stream = use_buffered_stream + + @property + def buffer_size(self): + return self.parquet_options.buffer_size + + @buffer_size.setter + def buffer_size(self, buffer_size): + if buffer_size <= 0: + raise ValueError("Buffer size must be larger than zero") + self.parquet_options.buffer_size = buffer_size + + @property + def pre_buffer(self): + return self.parquet_options.pre_buffer + + @pre_buffer.setter + def pre_buffer(self, bint pre_buffer): + self.parquet_options.pre_buffer = pre_buffer + + @property + def enable_parallel_column_conversion(self): + return self.parquet_options.enable_parallel_column_conversion + + @enable_parallel_column_conversion.setter + def enable_parallel_column_conversion( + self, bint enable_parallel_column_conversion): + self.parquet_options.enable_parallel_column_conversion = \ + enable_parallel_column_conversion + + def equals(self, ParquetFragmentScanOptions other): + return ( + self.use_buffered_stream == other.use_buffered_stream and + self.buffer_size == other.buffer_size and + self.pre_buffer == other.pre_buffer and + self.enable_parallel_column_conversion == + other.enable_parallel_column_conversion) + + def __reduce__(self): + return ParquetFragmentScanOptions, ( + self.use_buffered_stream, self.buffer_size, self.pre_buffer, + self.enable_parallel_column_conversion) + + cdef class IpcFileWriteOptions(FileWriteOptions): def __init__(self): @@ -1527,14 +1636,28 @@ cdef class CsvFileFormat(FileFormat): __slots__ = () def __init__(self, ParseOptions parse_options=None, + default_fragment_scan_options=None, ConvertOptions convert_options=None, ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options if convert_options is not None or read_options is not None: + if default_fragment_scan_options: + raise ValueError('If `default_fragment_scan_options` is ' + 'given, cannot specify convert_options ' + 'or read_options') self.default_fragment_scan_options = CsvFragmentScanOptions( convert_options=convert_options, read_options=read_options) + elif isinstance(default_fragment_scan_options, dict): + self.default_fragment_scan_options = CsvFragmentScanOptions( + **default_fragment_scan_options) + elif isinstance(default_fragment_scan_options, CsvFragmentScanOptions): + self.default_fragment_scan_options = default_fragment_scan_options + elif default_fragment_scan_options is not None: + raise TypeError('`default_fragment_scan_options` must be either ' + 'a dictionary or an instance of ' + 'CsvFragmentScanOptions') cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1558,10 +1681,14 @@ cdef class CsvFileFormat(FileFormat): super()._set_default_fragment_scan_options(options) def equals(self, CsvFileFormat other): - return self.parse_options.equals(other.parse_options) + return ( + self.parse_options.equals(other.parse_options) and + self.default_fragment_scan_options == + other.default_fragment_scan_options) def __reduce__(self): - return CsvFileFormat, (self.parse_options,) + return CsvFileFormat, (self.parse_options, + self.default_fragment_scan_options) def __repr__(self): return f"" @@ -1606,8 +1733,10 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): self.csv_options.read_options = read_options.options def equals(self, CsvFragmentScanOptions other): - return (self.convert_options.equals(other.convert_options) and - self.read_options.equals(other.read_options)) + return ( + other and + self.convert_options.equals(other.convert_options) and + self.read_options.equals(other.read_options)) def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 0c65070d872..615cb2516dc 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -43,6 +43,7 @@ ParquetFileFormat, ParquetFileFragment, ParquetFileWriteOptions, + ParquetFragmentScanOptions, ParquetReadOptions, Partitioning, PartitioningFactory, diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index db2e73acdff..a0231a29d38 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -237,11 +237,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFormatReaderOptions \ "arrow::dataset::ParquetFileFormat::ReaderOptions": - c_bool use_buffered_stream - int64_t buffer_size unordered_set[c_string] dict_columns - c_bool pre_buffer - c_bool enable_parallel_column_conversion cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): @@ -252,6 +248,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CSchema] physical_schema, vector[int] row_groups) + cdef cppclass CParquetFragmentScanOptions \ + "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions): + c_bool use_buffered_stream + int64_t buffer_size + c_bool pre_buffer + c_bool enable_parallel_column_conversion + cdef cppclass CIpcFileWriteOptions \ "arrow::dataset::IpcFileWriteOptions"(CFileWriteOptions): pass diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 37bca4e35c2..2595aa883cd 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1536,7 +1536,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, self._enable_parallel_column_conversion = True read_options.update(enable_parallel_column_conversion=True) - parquet_format = ds.ParquetFileFormat(read_options=read_options) + parquet_format = ds.ParquetFileFormat(**read_options) fragment = parquet_format.make_fragment(single_file, filesystem) self._dataset = ds.FileSystemDataset( @@ -1548,7 +1548,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, else: self._enable_parallel_column_conversion = False - parquet_format = ds.ParquetFileFormat(read_options=read_options) + parquet_format = ds.ParquetFileFormat(**read_options) # check partitioning to enable dictionary encoding if partitioning == "hive": diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a7dd1520168..2ca1028accb 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -502,32 +502,38 @@ def test_partition_keys(): def test_parquet_read_options(): opts1 = ds.ParquetReadOptions() - opts2 = ds.ParquetReadOptions(buffer_size=4096, - dictionary_columns=['a', 'b']) - opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True, - dictionary_columns={'a', 'b'}) - opts4 = ds.ParquetReadOptions(buffer_size=2**13, pre_buffer=True, - dictionary_columns={'a', 'b'}) + opts2 = ds.ParquetReadOptions(dictionary_columns=['a', 'b']) + + assert opts1.dictionary_columns == set() + + assert opts2.dictionary_columns == {'a', 'b'} + + assert opts1 == opts1 + assert opts1 != opts2 + + +def test_parquet_scan_options(): + opts1 = ds.ParquetFragmentScanOptions() + opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096) + opts3 = ds.ParquetFragmentScanOptions( + buffer_size=2**13, use_buffered_stream=True) + opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 assert opts1.pre_buffer is False - assert opts1.dictionary_columns == set() assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 assert opts2.pre_buffer is False - assert opts2.dictionary_columns == {'a', 'b'} assert opts3.use_buffered_stream is True assert opts3.buffer_size == 2**13 assert opts3.pre_buffer is False - assert opts3.dictionary_columns == {'a', 'b'} assert opts4.use_buffered_stream is False assert opts4.buffer_size == 2**13 assert opts4.pre_buffer is True - assert opts4.dictionary_columns == {'a', 'b'} assert opts1 == opts1 assert opts1 != opts2 @@ -546,14 +552,11 @@ def test_file_format_pickling(): ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, block_size=2**20)), ds.ParquetFileFormat(), + ds.ParquetFileFormat(dictionary_columns={'a'}), + ds.ParquetFileFormat(use_buffered_stream=True), ds.ParquetFileFormat( - read_options=ds.ParquetReadOptions(use_buffered_stream=True) - ), - ds.ParquetFileFormat( - read_options={ - 'use_buffered_stream': True, - 'buffer_size': 4096, - } + use_buffered_stream=True, + buffer_size=4096, ) ] for file_format in formats: @@ -567,6 +570,8 @@ def test_fragment_scan_options_pickling(): convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), ds.CsvFragmentScanOptions( read_options=pa.csv.ReadOptions(block_size=2**16)), + ds.ParquetFragmentScanOptions(buffer_size=4096), + ds.ParquetFragmentScanOptions(pre_buffer=True), ] for option in options: assert pickle.loads(pickle.dumps(option)) == option @@ -582,8 +587,8 @@ def test_fragment_scan_options_pickling(): @pytest.mark.parametrize('pre_buffer', [False, True]) def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer): format = ds.ParquetFileFormat( - read_options=ds.ParquetReadOptions(dictionary_columns={"str"}, - pre_buffer=pre_buffer) + read_options=ds.ParquetReadOptions(dictionary_columns={"str"}), + pre_buffer=pre_buffer ) options = ds.FileSystemFactoryOptions('subdir') @@ -702,10 +707,10 @@ def test_make_parquet_fragment_from_buffer(): ] dictionary_format = ds.ParquetFileFormat( read_options=ds.ParquetReadOptions( - use_buffered_stream=True, - buffer_size=4096, dictionary_columns=['alpha', 'animal'] - ) + ), + use_buffered_stream=True, + buffer_size=4096, ) cases = [ From 4aabd3b56c9780a5401b53803aeedf17db7b7d52 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 24 Mar 2021 11:04:26 -0400 Subject: [PATCH 4/9] ARROW-11972: [R][Dataset] Add ParquetFragmentScanOptions --- cpp/src/arrow/dataset/type_fwd.h | 1 + r/R/arrowExports.R | 8 ++++++-- r/R/dataset-format.R | 33 +++++++++++++++++++++++++------- r/src/arrowExports.cpp | 31 +++++++++++++++++++++++------- r/src/dataset.cpp | 20 ++++++++++++++----- 5 files changed, 72 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index d148d4ee2d3..88bf44a0cd7 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -68,6 +68,7 @@ class IpcFileWriteOptions; class ParquetFileFormat; class ParquetFileFragment; +class ParquetFragmentScanOptions; class ParquetFileWriter; class ParquetFileWriteOptions; diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2c7bf5c19f6..93084c45570 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -408,8 +408,8 @@ dataset___FileFormat__DefaultWriteOptions <- function(fmt){ .Call(`_arrow_dataset___FileFormat__DefaultWriteOptions`, fmt) } -dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){ - .Call(`_arrow_dataset___ParquetFileFormat__Make`, use_buffered_stream, buffer_size, dict_columns) +dataset___ParquetFileFormat__Make <- function(options, dict_columns){ + .Call(`_arrow_dataset___ParquetFileFormat__Make`, options, dict_columns) } dataset___FileWriteOptions__type_name <- function(options){ @@ -444,6 +444,10 @@ dataset___CsvFragmentScanOptions__Make <- function(convert_options, read_options .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, read_options) } +dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buffer_size, pre_buffer){ + .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer) +} + dataset___DirectoryPartitioning <- function(schm){ .Call(`_arrow_dataset___DirectoryPartitioning`, schm) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index cd54a300606..854672b66a2 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -34,11 +34,8 @@ #' * `...`: Additional format-specific options #' #' `format = "parquet"``: -#' * `use_buffered_stream`: Read files through buffered input streams rather than -#' loading entire row groups at once. This may be enabled -#' to reduce memory overhead. Disabled by default. -#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. #' * `dict_columns`: Names of columns which should be read as dictionaries. +#' * Any Parquet options from [FragmentScanOptions]. #' #' `format = "text"`: see [CsvParseOptions]. Note that you can specify them either #' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the @@ -91,10 +88,10 @@ as.character.FileFormat <- function(x, ...) { #' @rdname FileFormat #' @export ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat) -ParquetFileFormat$create <- function(use_buffered_stream = FALSE, - buffer_size = 8196, +ParquetFileFormat$create <- function(..., dict_columns = character(0)) { - dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns) + options <- ParquetFragmentScanOptions$create(...) + dataset___ParquetFileFormat__Make(options, dict_columns) } #' @usage NULL @@ -217,9 +214,18 @@ csv_file_format_read_options <- function(...) { #' @section Factory: #' `FragmentScanOptions$create()` takes the following arguments: #' * `format`: A string identifier of the file format. Currently supported values: +#' * "parquet" #' * "csv"/"text", aliases for the same format. #' * `...`: Additional format-specific options #' +#' `format = "parquet"``: +#' * `use_buffered_stream`: Read files through buffered input streams rather than +#' loading entire row groups at once. This may be enabled +#' to reduce memory overhead. Disabled by default. +#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. +#' * `pre_buffer`: Pre-buffer the raw Parquet data. This can improve performance +#' on high-latency filesystems. Disabled by default. +# #' `format = "text"`: see [CsvConvertOptions]. Note that options can only be #' specified with the Arrow C++ library naming. Also, "block_size" from #' [CsvReadOptions] may be given. @@ -240,6 +246,8 @@ FragmentScanOptions$create <- function(format, ...) { opt_names <- names(list(...)) if (format %in% c("csv", "text", "tsv")) { CsvFragmentScanOptions$create(...) + } else if (format == "parquet") { + ParquetFragmentScanOptions$create(...) } else { stop("Unsupported file format: ", format, call. = FALSE) } @@ -261,6 +269,17 @@ CsvFragmentScanOptions$create <- function(..., dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts) } +#' @usage NULL +#' @format NULL +#' @rdname FragmentScanOptions +#' @export +ParquetFragmentScanOptions <- R6Class("ParquetFragmentScanOptions", inherit = FragmentScanOptions) +ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE, + buffer_size = 8196, + pre_buffer = FALSE) { + dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer) +} + #' Format-specific write options #' #' @description diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b06a2696e50..0e7ec3d7ca0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1014,17 +1014,16 @@ extern "C" SEXP _arrow_dataset___FileFormat__DefaultWriteOptions(SEXP fmt_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns); -extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ +std::shared_ptr dataset___ParquetFileFormat__Make(const std::shared_ptr& options, cpp11::strings dict_columns); +extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){ BEGIN_CPP11 - arrow::r::Input::type use_buffered_stream(use_buffered_stream_sexp); - arrow::r::Input::type buffer_size(buffer_size_sexp); + arrow::r::Input&>::type options(options_sexp); arrow::r::Input::type dict_columns(dict_columns_sexp); - return cpp11::as_sexp(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)); + return cpp11::as_sexp(dataset___ParquetFileFormat__Make(options, dict_columns)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ +extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){ Rf_error("Cannot call dataset___ParquetFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -1161,6 +1160,23 @@ extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_optio } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +std::shared_ptr dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer); +extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){ +BEGIN_CPP11 + arrow::r::Input::type use_buffered_stream(use_buffered_stream_sexp); + arrow::r::Input::type buffer_size(buffer_size_sexp); + arrow::r::Input::type pre_buffer(pre_buffer_sexp); + return cpp11::as_sexp(dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){ + Rf_error("Cannot call dataset___ParquetFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); @@ -4295,7 +4311,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FileSystemDatasetFactory__Make3", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make3, 4}, { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, { "_arrow_dataset___FileFormat__DefaultWriteOptions", (DL_FUNC) &_arrow_dataset___FileFormat__DefaultWriteOptions, 1}, - { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, + { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 2}, { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, @@ -4304,6 +4320,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 3}, { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, + { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 89c3e4d56d8..1787e424557 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -216,11 +216,10 @@ std::shared_ptr dataset___FileFormat__DefaultWriteOptions( // [[dataset::export]] std::shared_ptr dataset___ParquetFileFormat__Make( - bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns) { + const std::shared_ptr& options, + cpp11::strings dict_columns) { auto fmt = std::make_shared(); - - fmt->reader_options.use_buffered_stream = use_buffered_stream; - fmt->reader_options.buffer_size = buffer_size; + fmt->default_fragment_scan_options = std::move(options); auto dict_columns_vector = cpp11::as_cpp>(dict_columns); auto& d = fmt->reader_options.dict_columns; @@ -284,7 +283,7 @@ std::shared_ptr dataset___CsvFileFormat__Make( return format; } -// FragmentScanOptions, CsvFragmentScanOptions +// FragmentScanOptions, CsvFragmentScanOptions, ParquetFragmentScanOptions // [[dataset::export]] std::string dataset___FragmentScanOptions__type_name( @@ -302,6 +301,17 @@ std::shared_ptr dataset___CsvFragmentScanOptions__Ma return options; } +// [[dataset::export]] +std::shared_ptr +dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, + bool pre_buffer) { + auto options = std::make_shared(); + options->use_buffered_stream = use_buffered_stream; + options->buffer_size = buffer_size; + options->pre_buffer = pre_buffer; + return options; +} + // DirectoryPartitioning, HivePartitioning // [[dataset::export]] From 74c0192f640e7ae71fa1d5d0659b3056dcf16ea3 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 24 Mar 2021 13:49:54 -0400 Subject: [PATCH 5/9] Try to fix Windows tests by using less memory --- cpp/src/arrow/dataset/file_ipc_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 97c45aea863..b5d873e831d 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -136,7 +136,8 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { TEST_F(TestIpcFileFormat, FragmentScanOptions) { auto reader = GetRecordBatchReader( - schema({field("list", list(float64())), field("f64", float64())})); + schema({field("list", list(float64()), key_value_metadata({{"max_length", "1"}})), + field("f64", float64())})); auto source = GetFileSource(reader.get()); SetSchema(reader->schema()->fields()); From 3cc2a3ba21a29199a9718e8e30793c1b9aa3f664 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 24 Mar 2021 16:18:47 -0400 Subject: [PATCH 6/9] Try to fix Windows tests --- cpp/src/arrow/dataset/file_ipc_test.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index b5d873e831d..dba4f52a459 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -136,7 +136,9 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { TEST_F(TestIpcFileFormat, FragmentScanOptions) { auto reader = GetRecordBatchReader( - schema({field("list", list(float64()), key_value_metadata({{"max_length", "1"}})), + // ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash + schema({field("list", list(float64()), false, + key_value_metadata({{"max_length", "1"}})), field("f64", float64())})); auto source = GetFileSource(reader.get()); From ed2c4dcbfb029d663a85d1f16f0f1642bdd993dd Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 14:37:37 -0400 Subject: [PATCH 7/9] Directly embed Parquet properties --- cpp/src/arrow/dataset/dataset_internal.h | 2 +- cpp/src/arrow/dataset/file_ipc.cc | 9 +++-- cpp/src/arrow/dataset/file_parquet.cc | 29 +++++++++----- cpp/src/arrow/dataset/file_parquet.h | 41 +++++--------------- cpp/src/arrow/dataset/file_parquet_test.cc | 2 +- cpp/src/arrow/dataset/type_fwd.h | 1 + python/pyarrow/_dataset.pyx | 21 +++++++--- python/pyarrow/_parquet.pxd | 3 ++ python/pyarrow/includes/libarrow_dataset.pxd | 5 +-- r/src/dataset.cpp | 11 ++++-- 10 files changed, 64 insertions(+), 60 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index b28bf7a14a4..a5ac474754b 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -191,7 +191,7 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded /// but of the wrong type, an error is returned. template arrow::Result> GetFragmentScanOptions( - const std::string& type_name, ScanOptions* scan_options, + const std::string& type_name, const ScanOptions* scan_options, const std::shared_ptr& default_options) { auto source = default_options; if (scan_options && scan_options->fragment_scan_options) { diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 178fc9766cb..a53f4dd2cb4 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -83,7 +83,7 @@ class IpcScanTask : public ScanTask { struct Impl { static Result Make(const FileSource& source, FileFormat* format, - ScanOptions* scan_options) { + const ScanOptions* scan_options) { ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); ARROW_ASSIGN_OR_RAISE( @@ -95,9 +95,10 @@ class IpcScanTask : public ScanTask { options.memory_pool = scan_options->pool; options.use_threads = false; if (!options.included_fields.empty()) { - // Cannot set them ehre - return Status::Invalid( - "Cannot set included_fields in scan options for IPC fragments"); + // Cannot set them here + ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set " + "but will be ignored; included_fields are derived from " + "fields referenced by the scan"; } ARROW_ASSIGN_OR_RAISE( options.included_fields, diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 998200a295e..8caae949784 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -130,14 +130,16 @@ class ParquetScanTask : public ScanTask { static parquet::ReaderProperties MakeReaderProperties( const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options, MemoryPool* pool = default_memory_pool()) { + // Can't mutate pool after construction parquet::ReaderProperties properties(pool); - if (parquet_scan_options->use_buffered_stream) { + if (parquet_scan_options->reader_properties->is_buffered_stream_enabled()) { properties.enable_buffered_stream(); } else { properties.disable_buffered_stream(); } - properties.set_buffer_size(parquet_scan_options->buffer_size); - properties.file_decryption_properties(format.reader_options.file_decryption_properties); + properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size()); + properties.file_decryption_properties( + parquet_scan_options->reader_properties->file_decryption_properties()); return properties; } @@ -255,12 +257,8 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const { ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) { auto parquet_scan_options = std::make_shared(); - parquet_scan_options->use_buffered_stream = - reader_properties.is_buffered_stream_enabled(); - parquet_scan_options->buffer_size = reader_properties.buffer_size(); + *parquet_scan_options->reader_properties = reader_properties; default_fragment_scan_options = std::move(parquet_scan_options); - reader_options.file_decryption_properties = - reader_properties.file_decryption_properties(); } Result ParquetFileFormat::IsSupported(const FileSource& source) const { @@ -369,14 +367,15 @@ Result ParquetFileFormat::ScanFile( GetFragmentScanOptions(kParquetTypeName, options.get(), default_fragment_scan_options)); std::shared_ptr pre_buffer_once = nullptr; - if (parquet_scan_options->pre_buffer) { + if (parquet_scan_options->arrow_reader_properties->pre_buffer()) { pre_buffer_once = std::make_shared(); } for (size_t i = 0; i < row_groups.size(); ++i) { tasks[i] = std::make_shared( row_groups[i], column_projection, reader, pre_buffer_once, row_groups, - parquet_scan_options->io_context, parquet_scan_options->cache_options, options, + parquet_scan_options->arrow_reader_properties->io_context(), + parquet_scan_options->arrow_reader_properties->cache_options(), options, fragment); } @@ -599,6 +598,16 @@ Result> ParquetFileFragment::FilterRowGroups(Expression predica return row_groups; } +// +// ParquetFragmentScanOptions +// + +ParquetFragmentScanOptions::ParquetFragmentScanOptions() { + reader_properties = std::make_shared(); + arrow_reader_properties = + std::make_shared(/*use_threads=*/false); +} + // // ParquetDatasetFactory // diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 3ecc06108d4..fa0d7dea843 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -75,13 +75,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { bool Equals(const FileFormat& other) const override; struct ReaderOptions { - /// \defgroup parquet-file-format-reader-properties properties which correspond to - /// members of parquet::ReaderProperties. - /// - /// @{ - std::shared_ptr file_decryption_properties; - /// @} - /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond /// to members of parquet::ArrowReaderProperties. /// @@ -196,33 +189,17 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// \brief Per-scan options for Parquet fragments class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions { public: + ParquetFragmentScanOptions(); std::string type_name() const override { return kParquetTypeName; } - /// \defgroup parquet-file-format-reader-properties properties which correspond to - /// members of parquet::ReaderProperties. - /// - /// We don't embed parquet::ReaderProperties directly because we get memory_pool from - /// ScanOptions at scan time and provide differing defaults. - /// - /// @{ - bool use_buffered_stream = false; - int64_t buffer_size = 1 << 13; - /// @} - - /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond - /// to members of parquet::ArrowReaderProperties. - /// - /// We don't embed parquet::ReaderProperties directly because we get batch_size from - /// ScanOptions at scan time, and we will never pass use_threads == true (since we - /// defer parallelization of the scan). Additionally column names (rather than - /// indices) are used to indicate dictionary columns. - /// - /// @{ - bool pre_buffer = false; - arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); - arrow::io::IOContext io_context; - /// @} - + /// Reader properties. Not all properties are respected: memory_pool comes from + /// ScanOptions. + std::shared_ptr reader_properties; + /// Arrow reader properties. Not all properties are respected: batch_size comes from + /// ScanOptions, and use_threads will be overridden based on + /// enable_parallel_column_conversion. Additionally, dictionary columns come from + /// ParquetFileFormat::ReaderOptions::dict_columns. + std::shared_ptr arrow_reader_properties; /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a /// scan is already parallelized across input files to avoid thread contention. This /// option will be removed after support is added for simultaneous parallelization diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 93f9d869420..bb06e7f2b63 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -274,7 +274,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) { ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); auto fragment_scan_options = std::make_shared(); - fragment_scan_options->pre_buffer = true; + fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); opts_->fragment_scan_options = fragment_scan_options; ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 88bf44a0cd7..6ba65a63afd 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -65,6 +65,7 @@ struct CsvFragmentScanOptions; class IpcFileFormat; class IpcFileWriter; class IpcFileWriteOptions; +class IpcFragmentScanOptions; class ParquetFileFormat; class ParquetFileFragment; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5ea92e9a914..adf1b36f6be 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1556,31 +1556,40 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): FragmentScanOptions.init(self, sp) self.parquet_options = sp.get() + cdef CReaderProperties* reader_properties(self): + return self.parquet_options.reader_properties.get() + + cdef ArrowReaderProperties* arrow_reader_properties(self): + return self.parquet_options.arrow_reader_properties.get() + @property def use_buffered_stream(self): - return self.parquet_options.use_buffered_stream + return self.reader_properties().is_buffered_stream_enabled() @use_buffered_stream.setter def use_buffered_stream(self, bint use_buffered_stream): - self.parquet_options.use_buffered_stream = use_buffered_stream + if use_buffered_stream: + self.reader_properties().enable_buffered_stream() + else: + self.reader_properties().disable_buffered_stream() @property def buffer_size(self): - return self.parquet_options.buffer_size + return self.reader_properties().buffer_size() @buffer_size.setter def buffer_size(self, buffer_size): if buffer_size <= 0: raise ValueError("Buffer size must be larger than zero") - self.parquet_options.buffer_size = buffer_size + self.reader_properties().set_buffer_size(buffer_size) @property def pre_buffer(self): - return self.parquet_options.pre_buffer + return self.arrow_reader_properties().pre_buffer() @pre_buffer.setter def pre_buffer(self, bint pre_buffer): - self.parquet_options.pre_buffer = pre_buffer + self.arrow_reader_properties().set_pre_buffer(pre_buffer) @property def enable_parallel_column_conversion(self): diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 8fa1c855b3e..96bfd77552e 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -329,6 +329,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: uint32_t* metadata_len) cdef cppclass CReaderProperties" parquet::ReaderProperties": + c_bool is_buffered_stream_enabled() const void enable_buffered_stream() void disable_buffered_stream() void set_buffer_size(int64_t buf_size) @@ -342,6 +343,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: c_bool read_dictionary() void set_batch_size(int64_t batch_size) int64_t batch_size() + void set_pre_buffer(c_bool pre_buffer) + c_bool pre_buffer() const ArrowReaderProperties default_arrow_reader_properties() diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index a0231a29d38..06ec69c8b80 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -250,9 +250,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFragmentScanOptions \ "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions): - c_bool use_buffered_stream - int64_t buffer_size - c_bool pre_buffer + shared_ptr[CReaderProperties] reader_properties + shared_ptr[ArrowReaderProperties] arrow_reader_properties c_bool enable_parallel_column_conversion cdef cppclass CIpcFileWriteOptions \ diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 1787e424557..b6983451835 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace ds = ::arrow::dataset; namespace fs = ::arrow::fs; @@ -306,9 +307,13 @@ std::shared_ptr dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer) { auto options = std::make_shared(); - options->use_buffered_stream = use_buffered_stream; - options->buffer_size = buffer_size; - options->pre_buffer = pre_buffer; + if (use_buffered_stream) { + options->reader_properties->enable_buffered_stream(); + } else { + options->reader_properties->disable_buffered_stream(); + } + options->reader_properties->set_buffer_size(buffer_size); + options->arrow_reader_properties->set_pre_buffer(pre_buffer); return options; } From 987aec1bfab1023227f6d6d0c21e06f7e63fda51 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 16:34:17 -0400 Subject: [PATCH 8/9] Remove unnecessary test --- cpp/src/arrow/dataset/file_ipc_test.cc | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index dba4f52a459..502b61ca645 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -154,13 +154,6 @@ TEST_F(TestIpcFileFormat, FragmentScanOptions) { ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next()); ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute()); ASSERT_RAISES(Invalid, batches.Next()); - - // Ensure included_fields cannot be set - fragment_scan_options->options = std::make_shared(); - fragment_scan_options->options->included_fields = {1}; - ASSERT_OK_AND_ASSIGN(scan_tasks, fragment->Scan(opts_)); - ASSERT_OK_AND_ASSIGN(scan_task, scan_tasks.Next()); - ASSERT_RAISES(Invalid, scan_task->Execute()); } TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { From ebc7c60ee589a75569abf1a9e67cd5b73a755f1a Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 18:27:00 -0400 Subject: [PATCH 9/9] Add missing include --- cpp/src/arrow/dataset/file_ipc.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index a53f4dd2cb4..24ea6e36ff2 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -29,6 +29,7 @@ #include "arrow/ipc/writer.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" namespace arrow {