From 25b08174acbe83ad8deda9438d7c9b9debf340c1 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 16 Mar 2021 10:08:51 -0400 Subject: [PATCH 1/7] ARROW-8631: [C++][Dataset] Expose csv::ReadOptions --- cpp/src/arrow/dataset/dataset_internal.h | 9 +++++++++ cpp/src/arrow/dataset/file_csv.cc | 19 +++++++++++------- cpp/src/arrow/dataset/file_csv.h | 8 ++++++++ cpp/src/arrow/dataset/file_csv_test.cc | 25 ++++++++++++++++++++++++ cpp/src/arrow/dataset/scanner.cc | 6 ++++++ cpp/src/arrow/dataset/scanner.h | 3 +++ 6 files changed, 63 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 331ad3d81c6..831f84de434 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -185,5 +185,14 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded l.partition_expression == r.partition_expression; } +template +std::shared_ptr DowncastFragmentScanOptions( + const std::shared_ptr& scan_options, const std::string& type_name) { + if (!scan_options) return nullptr; + if (!scan_options->fragment_scan_options) return nullptr; + if (scan_options->fragment_scan_options->type_name() != type_name) return nullptr; + return internal::checked_pointer_cast(scan_options->fragment_scan_options); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b7c7f3290da..b5e601e55a3 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -83,10 +83,9 @@ static inline Result GetConvertOptions( GetColumnNames(format.parse_options, first_block, pool)); auto convert_options = csv::ConvertOptions::Defaults(); - if (scan_options && scan_options->fragment_scan_options && - scan_options->fragment_scan_options->type_name() == kCsvTypeName) { - auto csv_scan_options = internal::checked_pointer_cast( - scan_options->fragment_scan_options); + auto csv_scan_options = + DowncastFragmentScanOptions(scan_options, kCsvTypeName); + if (csv_scan_options) { convert_options = csv_scan_options->convert_options; } @@ -99,12 +98,18 @@ static inline Result GetConvertOptions( return convert_options; } -static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { - auto read_options = csv::ReadOptions::Defaults(); +static inline csv::ReadOptions GetReadOptions( + const CsvFileFormat& format, const std::shared_ptr& scan_options) { + auto read_options = format.read_options; // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it // here. read_options.use_threads = false; + auto csv_scan_options = + DowncastFragmentScanOptions(scan_options, kCsvTypeName); + if (csv_scan_options) { + read_options.block_size = csv_scan_options->block_size; + } return read_options; } @@ -112,7 +117,7 @@ static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { - auto reader_options = GetReadOptions(format); + auto reader_options = GetReadOptions(format, scan_options); util::string_view first_block; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 4c66e291a47..24d825c4b65 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -38,6 +38,11 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: /// Options affecting the parsing of CSV files csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); + /// Options affecting how CSV files are read. + /// + /// Note use_threads is ignored (it is always considered false) and block_size + /// should be set on CsvFragmentScanOptions. + csv::ReadOptions read_options = csv::ReadOptions::Defaults(); std::string type_name() const override { return kCsvTypeName; } @@ -67,6 +72,9 @@ class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { std::string type_name() const override { return kCsvTypeName; } csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); + + /// Block size for reading (arrow::csv::ReadOptions::block_size) + int32_t block_size = 1 << 20; // 1 MB }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index c3c8796e17e..1e0742a5a92 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -134,6 +134,31 @@ bar)"); ASSERT_EQ(null_count, 1); } +TEST_P(TestCsvFileFormat, CustomReadOptions) { + auto source = GetFileSource(R"(header_skipped +str +foo +MYNULL +N/A +bar)"); + SetSchema({field("str", utf8())}); + format_->read_options.skip_rows = 1; + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->block_size = 1 << 22; + opts_->fragment_scan_options = fragment_scan_options; + + ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual(opts_->dataset_schema, physical_schema); + + int64_t rows = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + rows += batch->GetColumnByName("str")->length(); + } + ASSERT_EQ(rows, 4); +} + TEST_P(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { auto source = GetFileSource(R"(f64 1.0 diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 1aca9fa4882..dee96ceb836 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -149,6 +149,12 @@ Status ScannerBuilder::Pool(MemoryPool* pool) { return Status::OK(); } +Status ScannerBuilder::FragmentScanOptions( + std::shared_ptr fragment_scan_options) { + scan_options_->fragment_scan_options = std::move(fragment_scan_options); + return Status::OK(); +} + Result> ScannerBuilder::Finish() { if (!scan_options_->projection.IsBound()) { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 6e06af06066..df5f7954afe 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -240,6 +240,9 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Set the pool from which materialized and scanned arrays will be allocated. Status Pool(MemoryPool* pool); + /// \brief Set fragment-specific scan options. + Status FragmentScanOptions(std::shared_ptr fragment_scan_options); + /// \brief Return the constructed now-immutable Scanner object Result> Finish(); From ac53ac9fe9005f02e65997a700807740d80334cd Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 16 Mar 2021 11:20:27 -0400 Subject: [PATCH 2/7] ARROW-8631: [Python][Dataset] Expose CsvFragmentScanOptions --- python/pyarrow/_csv.pxd | 17 +++++ python/pyarrow/_csv.pyx | 20 +++-- python/pyarrow/_dataset.pyx | 78 ++++++++++++++++++-- python/pyarrow/dataset.py | 1 + python/pyarrow/includes/libarrow_dataset.pxd | 11 +++ python/pyarrow/tests/test_dataset.py | 27 +++++++ 6 files changed, 140 insertions(+), 14 deletions(-) diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index 2d9d24aea57..f8e12f16bc8 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -21,9 +21,26 @@ from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport _Weakrefable +cdef class ConvertOptions(_Weakrefable): + cdef: + CCSVConvertOptions options + + @staticmethod + cdef ConvertOptions wrap(CCSVConvertOptions options) + + cdef class ParseOptions(_Weakrefable): cdef: CCSVParseOptions options @staticmethod cdef ParseOptions wrap(CCSVParseOptions options) + + +cdef class ReadOptions(_Weakrefable): + cdef: + CCSVReadOptions options + public object encoding + + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index cce44d1d8c8..6e9b7fb1b36 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -73,10 +73,6 @@ cdef class ReadOptions(_Weakrefable): The character encoding of the CSV data. Columns that cannot decode using this encoding can still be read as Binary. """ - cdef: - CCSVReadOptions options - public object encoding - # Avoid mistakingly creating attributes __slots__ = () @@ -161,6 +157,13 @@ cdef class ReadOptions(_Weakrefable): def autogenerate_column_names(self, value): self.options.autogenerate_column_names = value + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options): + out = ReadOptions() + out.options = options + out.encoding = 'utf8' + return out + cdef class ParseOptions(_Weakrefable): """ @@ -391,9 +394,6 @@ cdef class ConvertOptions(_Weakrefable): `column_types`, or null by default). This option is ignored if `include_columns` is empty. """ - cdef: - CCSVConvertOptions options - # Avoid mistakingly creating attributes __slots__ = () @@ -603,6 +603,12 @@ cdef class ConvertOptions(_Weakrefable): self.options.timestamp_parsers = move(c_parsers) + @staticmethod + cdef ConvertOptions wrap(CCSVConvertOptions options): + out = ConvertOptions() + out.options = options + return out + cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 4f559f21e4c..a647046043a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -29,7 +29,7 @@ from pyarrow.lib cimport * from pyarrow.lib import frombytes, tobytes from pyarrow.includes.libarrow_dataset cimport * from pyarrow._fs cimport FileSystem, FileInfo, FileSelector -from pyarrow._csv cimport ParseOptions +from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions from pyarrow.util import _is_path_like, _stringify_path from pyarrow._parquet cimport ( @@ -377,6 +377,9 @@ cdef class Dataset(_Weakrefable): memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. + fragment_scan_options : FragmentScanOptions, default None + Options specific to a particular scan and fragment type, which + can change between different scans of the same dataset. Returns ------- @@ -816,6 +819,9 @@ cdef class Fragment(_Weakrefable): memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. + fragment_scan_options : FragmentScanOptions, default None + Options specific to a particular scan and fragment type, which + can change between different scans of the same dataset. Returns ------- @@ -966,6 +972,19 @@ class RowGroupInfo: return self.id == other.id +cdef class FragmentScanOptions(_Weakrefable): + """Scan options specific to a particular fragment and scan operation.""" + + cdef: + shared_ptr[CFragmentScanOptions] wrapped + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + self.wrapped = sp + + cdef class ParquetFileFragment(FileFragment): """A Fragment representing a parquet file.""" @@ -1363,10 +1382,13 @@ cdef class CsvFileFormat(FileFormat): cdef: CCsvFileFormat* csv_format - def __init__(self, ParseOptions parse_options=None): + def __init__(self, ParseOptions parse_options=None, + ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options + if read_options is not None: + self.read_options = read_options cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1383,6 +1405,14 @@ cdef class CsvFileFormat(FileFormat): def parse_options(self, ParseOptions parse_options not None): self.csv_format.parse_options = parse_options.options + @property + def read_options(self): + return ReadOptions.wrap(self.csv_format.read_options) + + @read_options.setter + def read_options(self, ReadOptions read_options not None): + self.csv_format.read_options = read_options.options + def equals(self, CsvFileFormat other): return self.parse_options.equals(other.parse_options) @@ -1390,6 +1420,31 @@ cdef class CsvFileFormat(FileFormat): return CsvFileFormat, (self.parse_options,) +cdef class CsvFragmentScanOptions(FragmentScanOptions): + """Scan-specific options for CSV fragments.""" + + cdef: + CCsvFragmentScanOptions* csv_options + + def __init__(self, ConvertOptions convert_options=None): + self.init(shared_ptr[CFragmentScanOptions]( + new CCsvFragmentScanOptions())) + if convert_options is not None: + self.convert_options = convert_options + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + FragmentScanOptions.init(self, sp) + self.csv_options = sp.get() + + @property + def convert_options(self): + return ConvertOptions.wrap(self.csv_options.convert_options) + + @convert_options.setter + def convert_options(self, ConvertOptions convert_options not None): + self.csv_options.convert_options = convert_options.options + + cdef class Partitioning(_Weakrefable): cdef: @@ -2192,7 +2247,9 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, list columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, bint use_threads=True, - MemoryPool memory_pool=None) except *: + MemoryPool memory_pool=None, + FragmentScanOptions fragment_scan_options=None)\ + except *: cdef: CScannerBuilder *builder builder = ptr.get() @@ -2207,6 +2264,9 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, check_status(builder.UseThreads(use_threads)) if memory_pool: check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool))) + if fragment_scan_options: + check_status( + builder.FragmentScanOptions(fragment_scan_options.wrapped)) cdef class Scanner(_Weakrefable): @@ -2269,7 +2329,8 @@ cdef class Scanner(_Weakrefable): def from_dataset(Dataset dataset not None, bint use_threads=True, MemoryPool memory_pool=None, list columns=None, Expression filter=None, - int batch_size=_DEFAULT_BATCH_SIZE): + int batch_size=_DEFAULT_BATCH_SIZE, + FragmentScanOptions fragment_scan_options=None): cdef: shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder @@ -2278,7 +2339,8 @@ cdef class Scanner(_Weakrefable): builder = make_shared[CScannerBuilder](dataset.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - memory_pool=memory_pool) + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) @@ -2287,7 +2349,8 @@ cdef class Scanner(_Weakrefable): def from_fragment(Fragment fragment not None, Schema schema=None, bint use_threads=True, MemoryPool memory_pool=None, list columns=None, Expression filter=None, - int batch_size=_DEFAULT_BATCH_SIZE): + int batch_size=_DEFAULT_BATCH_SIZE, + FragmentScanOptions fragment_scan_options=None): cdef: shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder @@ -2299,7 +2362,8 @@ cdef class Scanner(_Weakrefable): fragment.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - memory_pool=memory_pool) + memory_pool=memory_pool, + fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index a2cb87a1f7a..195d414b047 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -22,6 +22,7 @@ from pyarrow._dataset import ( # noqa CsvFileFormat, + CsvFragmentScanOptions, Expression, Dataset, DatasetFactory, diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f37f49f463d..f004d6824d7 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -59,6 +59,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) + cdef cppclass CFragmentScanOptions "arrow::dataset::FragmentScanOptions": + pass + ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \ "arrow::dataset::ScanTaskIterator" @@ -101,6 +104,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus UseThreads(c_bool use_threads) CStatus Pool(CMemoryPool* pool) CStatus BatchSize(int64_t batch_size) + CStatus FragmentScanOptions( + shared_ptr[CFragmentScanOptions] fragment_scan_options) CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const @@ -251,6 +256,12 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( CFileFormat): CCSVParseOptions parse_options + CCSVReadOptions read_options + + cdef cppclass CCsvFragmentScanOptions \ + "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): + CCSVConvertOptions convert_options + int32_t block_size cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e34700838df..64e4cd8f6d9 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2242,6 +2242,33 @@ def test_csv_format_compressed(tempdir, compression): assert result.equals(table) +def test_csv_format_options(tempdir): + path = str(tempdir / 'test.csv') + with open(path, 'w') as sink: + sink.write('skipped\ncol0\nfoo\nbar\n') + dataset = ds.dataset(path, format='csv') + result = dataset.to_table() + assert result.equals( + pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) + + dataset = ds.dataset(path, format=ds.CsvFileFormat( + read_options=pyarrow.csv.ReadOptions(skip_rows=1))) + result = dataset.to_table() + assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) + + +def test_csv_fragment_options(tempdir): + path = str(tempdir / 'test.csv') + with open(path, 'w') as sink: + sink.write('col0\nfoo\nspam\nMYNULL\n') + dataset = ds.dataset(path, format='csv') + convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], + strings_can_be_null=True) + options = ds.CsvFragmentScanOptions(convert_options=convert_options) + result = dataset.to_table(fragment_scan_options=options) + assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) + + def test_feather_format(tempdir): from pyarrow.feather import write_feather From 8df714fd6b6c51711113f12e7e1752a8339292c2 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 17 Mar 2021 13:40:44 -0400 Subject: [PATCH 3/7] ARROW-8631: [C++][Dataset] Don't expose csv::ReadOptions wholesale --- cpp/src/arrow/dataset/dataset_internal.h | 8 +- cpp/src/arrow/dataset/file_csv.cc | 19 +++-- cpp/src/arrow/dataset/file_csv.h | 14 ++-- cpp/src/arrow/dataset/file_csv_test.cc | 2 +- python/pyarrow/_csv.pxd | 9 --- python/pyarrow/_csv.pyx | 41 ++++++++-- python/pyarrow/_dataset.pyx | 80 +++++++++++++++++--- python/pyarrow/includes/libarrow_dataset.pxd | 4 +- python/pyarrow/tests/test_dataset.py | 23 +++++- 9 files changed, 152 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 831f84de434..37c384194a9 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -186,11 +186,15 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded } template -std::shared_ptr DowncastFragmentScanOptions( +arrow::Result> DowncastFragmentScanOptions( const std::shared_ptr& scan_options, const std::string& type_name) { if (!scan_options) return nullptr; if (!scan_options->fragment_scan_options) return nullptr; - if (scan_options->fragment_scan_options->type_name() != type_name) return nullptr; + const auto actual_type_name = scan_options->fragment_scan_options->type_name(); + if (actual_type_name != type_name) { + return Status::Invalid("FragmentScanOptions of type ", actual_type_name, + " were provided for scanning a fragment of type ", type_name); + } return internal::checked_pointer_cast(scan_options->fragment_scan_options); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b5e601e55a3..a3bc2483066 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -83,8 +83,9 @@ static inline Result GetConvertOptions( GetColumnNames(format.parse_options, first_block, pool)); auto convert_options = csv::ConvertOptions::Defaults(); - auto csv_scan_options = - DowncastFragmentScanOptions(scan_options, kCsvTypeName); + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + DowncastFragmentScanOptions(scan_options, kCsvTypeName)); if (csv_scan_options) { convert_options = csv_scan_options->convert_options; } @@ -98,15 +99,19 @@ static inline Result GetConvertOptions( return convert_options; } -static inline csv::ReadOptions GetReadOptions( +static inline Result GetReadOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options) { - auto read_options = format.read_options; + auto read_options = csv::ReadOptions::Defaults(); // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it // here. read_options.use_threads = false; - auto csv_scan_options = - DowncastFragmentScanOptions(scan_options, kCsvTypeName); + read_options.skip_rows = format.skip_rows; + read_options.column_names = format.column_names; + read_options.autogenerate_column_names = format.autogenerate_column_names; + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + DowncastFragmentScanOptions(scan_options, kCsvTypeName)); if (csv_scan_options) { read_options.block_size = csv_scan_options->block_size; } @@ -117,7 +122,7 @@ static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { - auto reader_options = GetReadOptions(format, scan_options); + ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); util::string_view first_block; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 24d825c4b65..4b3db66986c 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -38,11 +38,13 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: /// Options affecting the parsing of CSV files csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); - /// Options affecting how CSV files are read. - /// - /// Note use_threads is ignored (it is always considered false) and block_size - /// should be set on CsvFragmentScanOptions. - csv::ReadOptions read_options = csv::ReadOptions::Defaults(); + /// Number of header rows to skip (see arrow::csv::ReadOptions::skip_rows) + int32_t skip_rows = 0; + /// Column names for the target table (see arrow::csv::ReadOptions::column_names) + std::vector column_names; + /// Whether to generate column names or assume a header row (see + /// arrow::csv::ReadOptions::autogenerate_column_names) + bool autogenerate_column_names = false; std::string type_name() const override { return kCsvTypeName; } @@ -73,7 +75,7 @@ class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); - /// Block size for reading (arrow::csv::ReadOptions::block_size) + /// Block size for reading (see arrow::csv::ReadOptions::block_size) int32_t block_size = 1 << 20; // 1 MB }; diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 1e0742a5a92..f38f3341fe0 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -142,7 +142,7 @@ MYNULL N/A bar)"); SetSchema({field("str", utf8())}); - format_->read_options.skip_rows = 1; + format_->skip_rows = 1; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); auto fragment_scan_options = std::make_shared(); fragment_scan_options->block_size = 1 << 22; diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index f8e12f16bc8..f748d3e6e31 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -35,12 +35,3 @@ cdef class ParseOptions(_Weakrefable): @staticmethod cdef ParseOptions wrap(CCSVParseOptions options) - - -cdef class ReadOptions(_Weakrefable): - cdef: - CCSVReadOptions options - public object encoding - - @staticmethod - cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 6e9b7fb1b36..c5028d3f2c0 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -73,6 +73,10 @@ cdef class ReadOptions(_Weakrefable): The character encoding of the CSV data. Columns that cannot decode using this encoding can still be read as Binary. """ + cdef: + CCSVReadOptions options + public object encoding + # Avoid mistakingly creating attributes __slots__ = () @@ -157,13 +161,6 @@ cdef class ReadOptions(_Weakrefable): def autogenerate_column_names(self, value): self.options.autogenerate_column_names = value - @staticmethod - cdef ReadOptions wrap(CCSVReadOptions options): - out = ReadOptions() - out.options = options - out.encoding = 'utf8' - return out - cdef class ParseOptions(_Weakrefable): """ @@ -609,6 +606,36 @@ cdef class ConvertOptions(_Weakrefable): out.options = options return out + def equals(self, ConvertOptions other): + return ( + self.check_utf8 == other.check_utf8 and + self.column_types == other.column_types and + self.null_values == other.null_values and + self.true_values == other.true_values and + self.false_values == other.false_values and + self.timestamp_parsers == other.timestamp_parsers and + self.strings_can_be_null == other.strings_can_be_null and + self.auto_dict_encode == other.auto_dict_encode and + self.auto_dict_max_cardinality == + other.auto_dict_max_cardinality and + self.include_columns == other.include_columns and + self.include_missing_columns == other.include_missing_columns + ) + + def __getstate__(self): + return (self.check_utf8, self.column_types, self.null_values, + self.true_values, self.false_values, self.timestamp_parsers, + self.strings_can_be_null, self.auto_dict_encode, + self.auto_dict_max_cardinality, self.include_columns, + self.include_missing_columns) + + def __setstate__(self, state): + (self.check_utf8, self.column_types, self.null_values, + self.true_values, self.false_values, self.timestamp_parsers, + self.strings_can_be_null, self.auto_dict_encode, + self.auto_dict_max_cardinality, self.include_columns, + self.include_missing_columns) = state + cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a647046043a..3e6c0ba3e52 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -29,7 +29,7 @@ from pyarrow.lib cimport * from pyarrow.lib import frombytes, tobytes from pyarrow.includes.libarrow_dataset cimport * from pyarrow._fs cimport FileSystem, FileInfo, FileSelector -from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions +from pyarrow._csv cimport ConvertOptions, ParseOptions from pyarrow.util import _is_path_like, _stringify_path from pyarrow._parquet cimport ( @@ -984,6 +984,12 @@ cdef class FragmentScanOptions(_Weakrefable): cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): self.wrapped = sp + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef class ParquetFileFragment(FileFragment): """A Fragment representing a parquet file.""" @@ -1382,13 +1388,21 @@ cdef class CsvFileFormat(FileFormat): cdef: CCsvFileFormat* csv_format + # Avoid mistakingly creating attributes + __slots__ = () + def __init__(self, ParseOptions parse_options=None, - ReadOptions read_options=None): + skip_rows=None, column_names=None, + autogenerate_column_names=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options - if read_options is not None: - self.read_options = read_options + if skip_rows is not None: + self.skip_rows = skip_rows + if column_names is not None: + self.column_names = column_names + if autogenerate_column_names is not None: + self.autogenerate_column_names = autogenerate_column_names cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1406,18 +1420,40 @@ cdef class CsvFileFormat(FileFormat): self.csv_format.parse_options = parse_options.options @property - def read_options(self): - return ReadOptions.wrap(self.csv_format.read_options) + def skip_rows(self): + return self.csv_format.skip_rows + + @skip_rows.setter + def skip_rows(self, int skip_rows): + self.csv_format.skip_rows = skip_rows + + @property + def column_names(self): + return [frombytes(b) for b in self.csv_format.column_names] - @read_options.setter - def read_options(self, ReadOptions read_options not None): - self.csv_format.read_options = read_options.options + @column_names.setter + def column_names(self, list column_names): + self.csv_format.column_names = [tobytes(s) for s in column_names] + + @property + def autogenerate_column_names(self): + return self.csv_format.autogenerate_column_names + + @autogenerate_column_names.setter + def autogenerate_column_names(self, bint skip_rows): + self.csv_format.autogenerate_column_names = skip_rows def equals(self, CsvFileFormat other): - return self.parse_options.equals(other.parse_options) + return (self.parse_options.equals(other.parse_options) and + self.skip_rows == other.skip_rows and + self.column_names == other.column_names and + self.autogenerate_column_names == + other.autogenerate_column_names) def __reduce__(self): - return CsvFileFormat, (self.parse_options,) + return CsvFileFormat, (self.parse_options, self.skip_rows, + self.column_names, + self.autogenerate_column_names) cdef class CsvFragmentScanOptions(FragmentScanOptions): @@ -1426,11 +1462,16 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - def __init__(self, ConvertOptions convert_options=None): + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, ConvertOptions convert_options=None, block_size=None): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options + if block_size is not None: + self.block_size = block_size cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1444,6 +1485,21 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def convert_options(self, ConvertOptions convert_options not None): self.csv_options.convert_options = convert_options.options + @property + def block_size(self): + return self.csv_options.block_size + + @block_size.setter + def block_size(self, int block_size): + self.csv_options.block_size = block_size + + def equals(self, CsvFragmentScanOptions other): + return (self.convert_options.equals(other.convert_options) and + self.block_size == other.block_size) + + def __reduce__(self): + return CsvFragmentScanOptions, (self.convert_options, self.block_size) + cdef class Partitioning(_Weakrefable): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f004d6824d7..65b7dc4f622 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -256,7 +256,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( CFileFormat): CCSVParseOptions parse_options - CCSVReadOptions read_options + int32_t skip_rows + vector[c_string] column_names + c_bool autogenerate_column_names cdef cppclass CCsvFragmentScanOptions \ "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 64e4cd8f6d9..8e2838ff106 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -526,6 +526,7 @@ def test_file_format_pickling(): ds.CsvFileFormat(), ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), + ds.CsvFileFormat(skip_rows=3, column_names=['foo']), ds.ParquetFileFormat(), ds.ParquetFileFormat( read_options=ds.ParquetReadOptions(use_buffered_stream=True) @@ -541,6 +542,17 @@ def test_file_format_pickling(): assert pickle.loads(pickle.dumps(file_format)) == file_format +def test_fragment_scan_options_pickling(): + options = [ + ds.CsvFragmentScanOptions(), + ds.CsvFragmentScanOptions( + pa.csv.ConvertOptions(strings_can_be_null=True)), + ds.CsvFragmentScanOptions(block_size=2**16), + ] + for option in options: + assert pickle.loads(pickle.dumps(option)) == option + + @pytest.mark.parametrize('paths_or_selector', [ fs.FileSelector('subdir', recursive=True), [ @@ -2251,11 +2263,15 @@ def test_csv_format_options(tempdir): assert result.equals( pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) - dataset = ds.dataset(path, format=ds.CsvFileFormat( - read_options=pyarrow.csv.ReadOptions(skip_rows=1))) + dataset = ds.dataset(path, format=ds.CsvFileFormat(skip_rows=1)) result = dataset.to_table() assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) + dataset = ds.dataset(path, format=ds.CsvFileFormat(column_names=['foo'])) + result = dataset.to_table() + assert result.equals( + pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])})) + def test_csv_fragment_options(tempdir): path = str(tempdir / 'test.csv') @@ -2264,7 +2280,8 @@ def test_csv_fragment_options(tempdir): dataset = ds.dataset(path, format='csv') convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], strings_can_be_null=True) - options = ds.CsvFragmentScanOptions(convert_options=convert_options) + options = ds.CsvFragmentScanOptions( + convert_options=convert_options, block_size=2**16) result = dataset.to_table(fragment_scan_options=options) assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) From 4a895a6eea95893495904e5e616a5c67027709a5 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 18 Mar 2021 09:09:44 -0400 Subject: [PATCH 4/7] ARROW-8631: [R][Dataset] Expose csv format/scan options --- cpp/src/arrow/dataset/type_fwd.h | 3 ++ r/R/arrowExports.R | 16 +++++++- r/R/dataset-format.R | 66 ++++++++++++++++++++++++++++++-- r/R/dataset-scan.R | 10 +++++ r/src/arrowExports.cpp | 64 ++++++++++++++++++++++++++++--- r/src/dataset.cpp | 31 ++++++++++++++- r/tests/testthat/test-dataset.R | 53 +++++++++++++++++++++++++ 7 files changed, 231 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 62395ad1a6e..1ad19bdcab6 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -46,6 +46,8 @@ class Fragment; using FragmentIterator = Iterator>; using FragmentVector = std::vector>; +class FragmentScanOptions; + class FileSource; class FileFormat; class FileFragment; @@ -58,6 +60,7 @@ struct FileSystemDatasetWriteOptions; class InMemoryDataset; class CsvFileFormat; +class CsvFragmentScanOptions; class IpcFileFormat; class IpcFileWriter; diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 0d0d3d30f8d..71895a1eb6d 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -428,8 +428,16 @@ dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make`) } -dataset___CsvFileFormat__Make <- function(parse_options){ - .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options) +dataset___CsvFileFormat__Make <- function(parse_options, skip_rows, column_names, autogenerate_column_names){ + .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, skip_rows, column_names, autogenerate_column_names) +} + +dataset___FragmentScanOptions__type_name <- function(fragment_scan_options){ + .Call(`_arrow_dataset___FragmentScanOptions__type_name`, fragment_scan_options) +} + +dataset___CsvFragmentScanOptions__Make <- function(convert_options, block_size){ + .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, block_size) } dataset___DirectoryPartitioning <- function(schm){ @@ -468,6 +476,10 @@ dataset___ScannerBuilder__BatchSize <- function(sb, batch_size){ invisible(.Call(`_arrow_dataset___ScannerBuilder__BatchSize`, sb, batch_size)) } +dataset___ScannerBuilder__FragmentScanOptions <- function(sb, options){ + invisible(.Call(`_arrow_dataset___ScannerBuilder__FragmentScanOptions`, sb, options)) +} + dataset___ScannerBuilder__schema <- function(sb){ .Call(`_arrow_dataset___ScannerBuilder__schema`, sb) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index f1bf601c720..09d033696c4 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -40,11 +40,13 @@ #' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. #' * `dict_columns`: Names of columns which should be read as dictionaries. #' -#' `format = "text"`: see [CsvReadOptions]. Note that you can specify them either +#' `format = "text"`: see [CsvParseOptions]. Note that you can specify them either #' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the #' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.). #' Not all `readr` options are currently supported; please file an issue if -#' you encounter one that `arrow` should support. +#' you encounter one that `arrow` should support. Also, the following options of +#' [CsvReadOptions] can be passed (using the Arrow naming only): skip_rows, +#' column_names, and autogenerate_column_names. #' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) #' @rdname FileFormat @@ -101,8 +103,11 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' @rdname FileFormat #' @export CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat) -CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...)) { - dataset___CsvFileFormat__Make(opts) +CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...), + skip_rows = 0, + column_names = character(0), + autogenerate_column_names = FALSE) { + dataset___CsvFileFormat__Make(opts, skip_rows, column_names, autogenerate_column_names) } # Support both readr-style option names and Arrow C++ option names @@ -169,6 +174,59 @@ csv_file_format_parse_options <- function(...) { } } +#' Format-specific scan options +#' +#' @description +#' A `FragmentScanOptions` holds options specific to a `FileFormat` and a scan +#' operation. +#' +#' @section Factory: +#' `FragmentScanOptions$create()` takes the following arguments: +#' * `format`: A string identifier of the file format. Currently supported values: +#' * "csv"/"text", aliases for the same format. +#' * `...`: Additional format-specific options +#' +#' `format = "text"`: see [CsvConvertOptions]. Note that options can only be +#' specified with the Arrow C++ library naming. Also, "block_size" from +#' [CsvReadOptions] may be given. +#' +#' It returns the appropriate subclass of `FragmentScanOptions` +#' (e.g. `CsvFragmentScanOptions`). +#' @rdname FragmentScanOptions +#' @name FragmentScanOptions +#' @export +FragmentScanOptions <- R6Class("FragmentScanOptions", inherit = ArrowObject, + active = list( + # @description + # Return the `FragmentScanOptions`'s type + type = function() dataset___FragmentScanOptions__type_name(self) + ) +) +FragmentScanOptions$create <- function(format, ...) { + opt_names <- names(list(...)) + if (format %in% c("csv", "text", "tsv")) { + CsvFragmentScanOptions$create(...) + } else { + stop("Unsupported file format: ", format, call. = FALSE) + } +} + +#' @export +as.character.FragmentScanOptions <- function(x, ...) { + x$type +} + +#' @usage NULL +#' @format NULL +#' @rdname FragmentScanOptions +#' @export +CsvFragmentScanOptions <- R6Class("CsvFragmentScanOptions", inherit = FragmentScanOptions) +CsvFragmentScanOptions$create <- function(..., + opts = CsvConvertOptions$create(...), + block_size = 2**20) { + dataset___CsvFragmentScanOptions__Make(opts, block_size) +} + #' Format-specific write options #' #' @description diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 1c71bf481b5..7e148863226 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -67,6 +67,7 @@ Scanner$create <- function(dataset, filter = TRUE, use_threads = option_use_threads(), batch_size = NULL, + fragment_scan_options = NULL, ...) { if (inherits(dataset, "arrow_dplyr_query")) { if (inherits(dataset$.data, "ArrowTabular")) { @@ -78,6 +79,8 @@ Scanner$create <- function(dataset, dataset$selected_columns, dataset$filtered_rows, use_threads, + batch_size, + fragment_scan_options, ... )) } @@ -99,6 +102,9 @@ Scanner$create <- function(dataset, if (is_integerish(batch_size)) { scanner_builder$BatchSize(batch_size) } + if (!is.null(fragment_scan_options)) { + scanner_builder$FragmentScanOptions(fragment_scan_options) + } scanner_builder$Finish() } @@ -185,6 +191,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit = ArrowObject, dataset___ScannerBuilder__BatchSize(self, batch_size) self }, + FragmentScanOptions = function(options) { + dataset___ScannerBuilder__FragmentScanOptions(self, options) + self + }, Finish = function() dataset___ScannerBuilder__Finish(self) ), active = list( diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 7229e60b649..af3fd79336e 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1105,19 +1105,53 @@ extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options); -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ +std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, int32_t skip_rows, cpp11::strings column_names, bool autogenerate_column_names); +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp){ BEGIN_CPP11 arrow::r::Input&>::type parse_options(parse_options_sexp); - return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options)); + arrow::r::Input::type skip_rows(skip_rows_sexp); + arrow::r::Input::type column_names(column_names_sexp); + arrow::r::Input::type autogenerate_column_names(autogenerate_column_names_sexp); + return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, skip_rows, column_names, autogenerate_column_names)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp){ Rf_error("Cannot call dataset___CsvFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +std::string dataset___FragmentScanOptions__type_name(const std::shared_ptr& fragment_scan_options); +extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP fragment_scan_options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type fragment_scan_options(fragment_scan_options_sexp); + return cpp11::as_sexp(dataset___FragmentScanOptions__type_name(fragment_scan_options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP fragment_scan_options_sexp){ + Rf_error("Cannot call dataset___FragmentScanOptions__type_name(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +std::shared_ptr dataset___CsvFragmentScanOptions__Make(const std::shared_ptr& convert_options, int32_t block_size); +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP block_size_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type convert_options(convert_options_sexp); + arrow::r::Input::type block_size(block_size_sexp); + return cpp11::as_sexp(dataset___CsvFragmentScanOptions__Make(convert_options, block_size)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP block_size_sexp){ + Rf_error("Cannot call dataset___CsvFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); @@ -1265,6 +1299,23 @@ extern "C" SEXP _arrow_dataset___ScannerBuilder__BatchSize(SEXP sb_sexp, SEXP ba } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +void dataset___ScannerBuilder__FragmentScanOptions(const std::shared_ptr& sb, const std::shared_ptr& options); +extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP sb_sexp, SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type sb(sb_sexp); + arrow::r::Input&>::type options(options_sexp); + dataset___ScannerBuilder__FragmentScanOptions(sb, options); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP sb_sexp, SEXP options_sexp){ + Rf_error("Cannot call dataset___ScannerBuilder__FragmentScanOptions(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___ScannerBuilder__schema(const std::shared_ptr& sb); @@ -4222,7 +4273,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, - { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, + { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 4}, + { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, + { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, @@ -4232,6 +4285,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, { "_arrow_dataset___ScannerBuilder__UseThreads", (DL_FUNC) &_arrow_dataset___ScannerBuilder__UseThreads, 2}, { "_arrow_dataset___ScannerBuilder__BatchSize", (DL_FUNC) &_arrow_dataset___ScannerBuilder__BatchSize, 2}, + { "_arrow_dataset___ScannerBuilder__FragmentScanOptions", (DL_FUNC) &_arrow_dataset___ScannerBuilder__FragmentScanOptions, 2}, { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 83c7cbb844c..af57c191e2e 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -272,12 +272,34 @@ std::shared_ptr dataset___IpcFileFormat__Make() { // [[dataset::export]] std::shared_ptr dataset___CsvFileFormat__Make( - const std::shared_ptr& parse_options) { + const std::shared_ptr& parse_options, int32_t skip_rows, + cpp11::strings column_names, bool autogenerate_column_names) { auto format = std::make_shared(); format->parse_options = *parse_options; + format->skip_rows = skip_rows; + format->column_names = cpp11::as_cpp>(column_names); + format->autogenerate_column_names = autogenerate_column_names; return format; } +// FragmentScanOptions, CsvFragmentScanOptions + +// [[dataset::export]] +std::string dataset___FragmentScanOptions__type_name( + const std::shared_ptr& fragment_scan_options) { + return fragment_scan_options->type_name(); +} + +// [[dataset::export]] +std::shared_ptr dataset___CsvFragmentScanOptions__Make( + const std::shared_ptr& convert_options, + int32_t block_size) { + auto options = std::make_shared(); + options->convert_options = *convert_options; + options->block_size = block_size; + return options; +} + // DirectoryPartitioning, HivePartitioning // [[dataset::export]] @@ -346,6 +368,13 @@ void dataset___ScannerBuilder__BatchSize(const std::shared_ptrBatchSize(batch_size)); } +// [[dataset::export]] +void dataset___ScannerBuilder__FragmentScanOptions( + const std::shared_ptr& sb, + const std::shared_ptr& options) { + StopIfNotOk(sb->FragmentScanOptions(options)); +} + // [[dataset::export]] std::shared_ptr dataset___ScannerBuilder__schema( const std::shared_ptr& sb) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 67fd5004320..e392641bd1b 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -295,6 +295,29 @@ test_that("CSV dataset", { ) }) +test_that("CSV scan options", { + options <- FragmentScanOptions$create("text") + expect_equal(options$type, "csv") + options <- FragmentScanOptions$create("csv", + null_values = c("mynull"), + strings_can_be_null = TRUE) + expect_equal(options$type, "csv") + + dst_dir <- make_temp_dir() + dst_file <- file.path(dst_dir, "data.csv") + df <- tibble(chr = c("foo", "mynull")) + write.csv(df, dst_file, row.names = FALSE, quote = FALSE) + + ds <- open_dataset(dst_dir, format = "csv") + expect_equivalent(ds %>% collect(), df) + + sb <- ds$NewScan() + sb$FragmentScanOptions(options) + + tab <- sb$Finish()$ToTable() + expect_equivalent(as.data.frame(tab), tibble(chr = c("foo", NA))) +}) + test_that("compressed CSV dataset", { skip_if_not_available("gzip") dst_dir <- make_temp_dir() @@ -318,6 +341,36 @@ test_that("compressed CSV dataset", { ) }) +test_that("CSV dataset options", { + dst_dir <- make_temp_dir() + dst_file <- file.path(dst_dir, "data.csv") + df <- tibble(chr = letters[1:10]) + write.csv(df, dst_file, row.names = FALSE, quote = FALSE) + + format <- FileFormat$create("csv", skip_rows = 1) + ds <- open_dataset(dst_dir, format = format) + + expect_equivalent( + ds %>% + select(string = a) %>% + collect(), + df1[-1,] %>% + select(string = chr) + ) + + format <- FileFormat$create("csv", column_names = c("foo")) + ds <- open_dataset(dst_dir, format = format) + expect_is(ds$format, "CsvFileFormat") + expect_is(ds$filesystem, "LocalFileSystem") + + expect_equivalent( + ds %>% + select(string = foo) %>% + collect(), + tibble(foo = c(c('chr'), letters[1:10])) + ) +}) + test_that("Other text delimited dataset", { ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv") expect_equivalent( From c2d6428dd2afc739f03b69c9ce5b7e7733e7d870 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 19 Mar 2021 17:01:00 -0400 Subject: [PATCH 5/7] ARROW-8631: [C++][Python][R] let user specify default scan options on the file format --- cpp/src/arrow/dataset/dataset_internal.h | 2 +- cpp/src/arrow/dataset/file_csv.cc | 85 +++++++++++++------- cpp/src/arrow/dataset/file_csv.h | 45 ++++++----- cpp/src/arrow/dataset/file_csv_test.cc | 23 +++++- cpp/src/arrow/dataset/type_fwd.h | 2 +- python/pyarrow/_csv.pyx | 12 +++ python/pyarrow/_dataset.pyx | 59 +++++++++++--- python/pyarrow/includes/libarrow_dataset.pxd | 17 ++-- python/pyarrow/tests/test_dataset.py | 11 +++ r/R/arrowExports.R | 4 +- r/R/dataset-format.R | 38 +++++++-- r/src/arrowExports.cpp | 12 +-- r/src/dataset.cpp | 15 ++-- r/tests/testthat/test-dataset.R | 16 ++++ 14 files changed, 255 insertions(+), 86 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 37c384194a9..19648fab2de 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -187,7 +187,7 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded template arrow::Result> DowncastFragmentScanOptions( - const std::shared_ptr& scan_options, const std::string& type_name) { + ScanOptions* scan_options, const std::string& type_name) { if (!scan_options) return nullptr; if (!scan_options->fragment_scan_options) return nullptr; const auto actual_type_name = scan_options->fragment_scan_options->type_name(); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index a3bc2483066..b6119e92418 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -79,13 +79,14 @@ Result> GetColumnNames( static inline Result GetConvertOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options, const util::string_view first_block, MemoryPool* pool) { - ARROW_ASSIGN_OR_RAISE(auto column_names, - GetColumnNames(format.parse_options, first_block, pool)); - - auto convert_options = csv::ConvertOptions::Defaults(); ARROW_ASSIGN_OR_RAISE( - auto csv_scan_options, - DowncastFragmentScanOptions(scan_options, kCsvTypeName)); + auto column_names, + GetColumnNames(format.reader_options.parse_options, first_block, pool)); + + auto convert_options = format.reader_options.convert_options; + ARROW_ASSIGN_OR_RAISE(auto csv_scan_options, + DowncastFragmentScanOptions( + scan_options.get(), kCsvTypeName)); if (csv_scan_options) { convert_options = csv_scan_options->convert_options; } @@ -106,15 +107,15 @@ static inline Result GetReadOptions( // contention when ScanTasks are also executed in multiple threads, so we disable it // here. read_options.use_threads = false; - read_options.skip_rows = format.skip_rows; - read_options.column_names = format.column_names; - read_options.autogenerate_column_names = format.autogenerate_column_names; - ARROW_ASSIGN_OR_RAISE( - auto csv_scan_options, - DowncastFragmentScanOptions(scan_options, kCsvTypeName)); - if (csv_scan_options) { - read_options.block_size = csv_scan_options->block_size; - } + read_options.skip_rows = format.reader_options.skip_rows; + read_options.column_names = format.reader_options.column_names; + read_options.autogenerate_column_names = + format.reader_options.autogenerate_column_names; + ARROW_ASSIGN_OR_RAISE(auto csv_scan_options, + DowncastFragmentScanOptions( + scan_options.get(), kCsvTypeName)); + read_options.block_size = + csv_scan_options ? csv_scan_options->block_size : format.reader_options.block_size; return read_options; } @@ -131,7 +132,7 @@ static inline Result> OpenReader( default_memory_pool(), std::move(input))); ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size)); - const auto& parse_options = format.parse_options; + const auto& parse_options = format.reader_options.parse_options; auto convert_options = csv::ConvertOptions::Defaults(); if (scan_options != nullptr) { ARROW_ASSIGN_OR_RAISE(convert_options, @@ -173,17 +174,47 @@ class CsvScanTask : public ScanTask { bool CsvFileFormat::Equals(const FileFormat& format) const { if (type_name() != format.type_name()) return false; - const auto& other_parse_options = - checked_cast(format).parse_options; - - return parse_options.delimiter == other_parse_options.delimiter && - parse_options.quoting == other_parse_options.quoting && - parse_options.quote_char == other_parse_options.quote_char && - parse_options.double_quote == other_parse_options.double_quote && - parse_options.escaping == other_parse_options.escaping && - parse_options.escape_char == other_parse_options.escape_char && - parse_options.newlines_in_values == other_parse_options.newlines_in_values && - parse_options.ignore_empty_lines == other_parse_options.ignore_empty_lines; + const auto other_format = checked_cast(format); + const auto& other_convert_options = other_format.reader_options.convert_options; + const auto& other_parse_options = other_format.reader_options.parse_options; + + return reader_options.convert_options.check_utf8 == other_convert_options.check_utf8 && + reader_options.convert_options.column_types == + other_convert_options.column_types && + reader_options.convert_options.null_values == + other_convert_options.null_values && + reader_options.convert_options.true_values == + other_convert_options.true_values && + reader_options.convert_options.false_values == + other_convert_options.false_values && + reader_options.convert_options.strings_can_be_null == + other_convert_options.strings_can_be_null && + reader_options.convert_options.auto_dict_encode == + other_convert_options.auto_dict_encode && + reader_options.convert_options.auto_dict_max_cardinality == + other_convert_options.auto_dict_max_cardinality && + reader_options.convert_options.include_columns == + other_convert_options.include_columns && + reader_options.convert_options.include_missing_columns == + other_convert_options.include_missing_columns && + // N.B. values are not comparable + reader_options.convert_options.timestamp_parsers.size() == + other_convert_options.timestamp_parsers.size() && + reader_options.block_size == other_format.reader_options.block_size && + reader_options.parse_options.delimiter == other_parse_options.delimiter && + reader_options.parse_options.quoting == other_parse_options.quoting && + reader_options.parse_options.quote_char == other_parse_options.quote_char && + reader_options.parse_options.double_quote == other_parse_options.double_quote && + reader_options.parse_options.escaping == other_parse_options.escaping && + reader_options.parse_options.escape_char == other_parse_options.escape_char && + reader_options.parse_options.newlines_in_values == + other_parse_options.newlines_in_values && + reader_options.parse_options.ignore_empty_lines == + other_parse_options.ignore_empty_lines && + reader_options.skip_rows == other_format.reader_options.skip_rows && + reader_options.column_names == other_format.reader_options.column_names && + reader_options.autogenerate_column_names == + other_format.reader_options.autogenerate_column_names; } Result CsvFileFormat::IsSupported(const FileSource& source) const { diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 4b3db66986c..937995f9756 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -33,18 +33,35 @@ namespace dataset { constexpr char kCsvTypeName[] = "csv"; +/// \brief Per-scan options for CSV fragments +struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { + std::string type_name() const override { return kCsvTypeName; } + + /// CSV conversion options + csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); + + /// Block size for reading (see arrow::csv::ReadOptions::block_size) + int32_t block_size = 1 << 20; // 1 MB +}; + /// \brief A FileFormat implementation that reads from and writes to Csv files class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: - /// Options affecting the parsing of CSV files - csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); - /// Number of header rows to skip (see arrow::csv::ReadOptions::skip_rows) - int32_t skip_rows = 0; - /// Column names for the target table (see arrow::csv::ReadOptions::column_names) - std::vector column_names; - /// Whether to generate column names or assume a header row (see - /// arrow::csv::ReadOptions::autogenerate_column_names) - bool autogenerate_column_names = false; + /// \brief Dataset-wide options for CSV. For convenience (both for users, and + /// for authors of language bindings), these include fields for per-scan + /// options, however, if per-scan options are provided, then those fields will + /// be overridden. + struct ReaderOptions : CsvFragmentScanOptions { + /// Options affecting the parsing of CSV files + csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); + /// Number of header rows to skip (see arrow::csv::ReadOptions::skip_rows) + int32_t skip_rows = 0; + /// Column names for the target table (see arrow::csv::ReadOptions::column_names) + std::vector column_names; + /// Whether to generate column names or assume a header row (see + /// arrow::csv::ReadOptions::autogenerate_column_names) + bool autogenerate_column_names = false; + } reader_options; std::string type_name() const override { return kCsvTypeName; } @@ -69,15 +86,5 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } }; -class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - public: - std::string type_name() const override { return kCsvTypeName; } - - csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); - - /// Block size for reading (see arrow::csv::ReadOptions::block_size) - int32_t block_size = 1 << 20; // 1 MB -}; - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index f38f3341fe0..11ff685560f 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -93,6 +93,27 @@ class TestCsvFileFormat : public testing::TestWithParam { std::shared_ptr opts_ = std::make_shared(); }; +TEST_P(TestCsvFileFormat, Equality) { + CsvFileFormat options1; + CsvFileFormat options2; + options2.reader_options.skip_rows = 3; + CsvFileFormat options3; + options3.reader_options.parse_options.delimiter = '\t'; + CsvFileFormat options4; + options4.reader_options.block_size = 1 << 30; + + ASSERT_TRUE(options1.Equals(options1)); + ASSERT_TRUE(options2.Equals(options2)); + ASSERT_TRUE(options3.Equals(options3)); + ASSERT_TRUE(options4.Equals(options4)); + ASSERT_FALSE(options1.Equals(options2)); + ASSERT_FALSE(options1.Equals(options3)); + ASSERT_FALSE(options1.Equals(options4)); + ASSERT_FALSE(options2.Equals(options3)); + ASSERT_FALSE(options2.Equals(options4)); + ASSERT_FALSE(options3.Equals(options4)); +} + TEST_P(TestCsvFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(R"(f64 1.0 @@ -142,7 +163,7 @@ MYNULL N/A bar)"); SetSchema({field("str", utf8())}); - format_->skip_rows = 1; + format_->reader_options.skip_rows = 1; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); auto fragment_scan_options = std::make_shared(); fragment_scan_options->block_size = 1 << 22; diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 1ad19bdcab6..d148d4ee2d3 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -60,7 +60,7 @@ struct FileSystemDatasetWriteOptions; class InMemoryDataset; class CsvFileFormat; -class CsvFragmentScanOptions; +struct CsvFragmentScanOptions; class IpcFileFormat; class IpcFileWriter; diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index c5028d3f2c0..a5c755697b8 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -320,6 +320,12 @@ cdef class ParseOptions(_Weakrefable): self.escape_char, self.newlines_in_values, self.ignore_empty_lines) = state + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef class _ISO8601(_Weakrefable): """ @@ -636,6 +642,12 @@ cdef class ConvertOptions(_Weakrefable): self.auto_dict_max_cardinality, self.include_columns, self.include_missing_columns) = state + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3e6c0ba3e52..2df69e4412c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1393,7 +1393,9 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, skip_rows=None, column_names=None, - autogenerate_column_names=None): + autogenerate_column_names=None, + ConvertOptions convert_options=None, + block_size=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1403,6 +1405,10 @@ cdef class CsvFileFormat(FileFormat): self.column_names = column_names if autogenerate_column_names is not None: self.autogenerate_column_names = autogenerate_column_names + if convert_options is not None: + self.convert_options = convert_options + if block_size is not None: + self.block_size = block_size cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1413,47 +1419,78 @@ cdef class CsvFileFormat(FileFormat): @property def parse_options(self): - return ParseOptions.wrap(self.csv_format.parse_options) + return ParseOptions.wrap(self.csv_format.reader_options.parse_options) @parse_options.setter def parse_options(self, ParseOptions parse_options not None): - self.csv_format.parse_options = parse_options.options + self.csv_format.reader_options.parse_options = parse_options.options @property def skip_rows(self): - return self.csv_format.skip_rows + return self.csv_format.reader_options.skip_rows @skip_rows.setter def skip_rows(self, int skip_rows): - self.csv_format.skip_rows = skip_rows + self.csv_format.reader_options.skip_rows = skip_rows @property def column_names(self): - return [frombytes(b) for b in self.csv_format.column_names] + return [frombytes(b) + for b in self.csv_format.reader_options.column_names] @column_names.setter def column_names(self, list column_names): - self.csv_format.column_names = [tobytes(s) for s in column_names] + self.csv_format.reader_options.column_names = [ + tobytes(s) for s in column_names] @property def autogenerate_column_names(self): - return self.csv_format.autogenerate_column_names + return self.csv_format.reader_options.autogenerate_column_names @autogenerate_column_names.setter def autogenerate_column_names(self, bint skip_rows): - self.csv_format.autogenerate_column_names = skip_rows + self.csv_format.reader_options.autogenerate_column_names = skip_rows + + @property + def convert_options(self): + return ConvertOptions.wrap( + self.csv_format.reader_options.convert_options) + + @convert_options.setter + def convert_options(self, ConvertOptions convert_options not None): + self.csv_format.reader_options.convert_options = \ + convert_options.options + + @property + def block_size(self): + return self.csv_format.reader_options.block_size + + @block_size.setter + def block_size(self, int block_size): + self.csv_format.reader_options.block_size = block_size def equals(self, CsvFileFormat other): return (self.parse_options.equals(other.parse_options) and self.skip_rows == other.skip_rows and self.column_names == other.column_names and self.autogenerate_column_names == - other.autogenerate_column_names) + other.autogenerate_column_names and + self.convert_options == other.convert_options and + self.block_size == other.block_size) def __reduce__(self): return CsvFileFormat, (self.parse_options, self.skip_rows, self.column_names, - self.autogenerate_column_names) + self.autogenerate_column_names, + self.convert_options, self.block_size) + + def __repr__(self): + return (f"") cdef class CsvFragmentScanOptions(FragmentScanOptions): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 65b7dc4f622..171ea905e42 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -253,17 +253,22 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileFormat): pass - cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( - CFileFormat): + cdef cppclass CCsvFragmentScanOptions \ + "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): + CCSVConvertOptions convert_options + int32_t block_size + + cdef cppclass CCsvFileFormatReaderOptions \ + "arrow::dataset::ParquetFileFormat::ReaderOptions"( + CCsvFragmentScanOptions): CCSVParseOptions parse_options int32_t skip_rows vector[c_string] column_names c_bool autogenerate_column_names - cdef cppclass CCsvFragmentScanOptions \ - "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): - CCSVConvertOptions convert_options - int32_t block_size + cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( + CFileFormat): + CCsvFileFormatReaderOptions reader_options cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8e2838ff106..aa3bee33d22 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -527,6 +527,7 @@ def test_file_format_pickling(): ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), ds.CsvFileFormat(skip_rows=3, column_names=['foo']), + ds.CsvFileFormat(skip_rows=3, block_size=2**20), ds.ParquetFileFormat(), ds.ParquetFileFormat( read_options=ds.ParquetReadOptions(use_buffered_stream=True) @@ -2285,6 +2286,16 @@ def test_csv_fragment_options(tempdir): result = dataset.to_table(fragment_scan_options=options) assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) + csv_format = ds.CsvFileFormat(convert_options=convert_options) + dataset = ds.dataset(path, format=csv_format) + result = dataset.to_table() + assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) + + options = ds.CsvFragmentScanOptions() + result = dataset.to_table(fragment_scan_options=options) + assert result.equals( + pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) + def test_feather_format(tempdir): from pyarrow.feather import write_feather diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 71895a1eb6d..7d57bdc7241 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -428,8 +428,8 @@ dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make`) } -dataset___CsvFileFormat__Make <- function(parse_options, skip_rows, column_names, autogenerate_column_names){ - .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, skip_rows, column_names, autogenerate_column_names) +dataset___CsvFileFormat__Make <- function(parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size){ + .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size) } dataset___FragmentScanOptions__type_name <- function(fragment_scan_options){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 09d033696c4..93482b74168 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -44,9 +44,14 @@ #' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the #' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.). #' Not all `readr` options are currently supported; please file an issue if -#' you encounter one that `arrow` should support. Also, the following options of -#' [CsvReadOptions] can be passed (using the Arrow naming only): skip_rows, -#' column_names, and autogenerate_column_names. +#' you encounter one that `arrow` should support. Also, the following options are +#' supported. From [CsvReadOptions]: +#' * `skip_rows` +#' * `column_names` +#' * `autogenerate_column_names` +#' From [CsvFragmentScanOptions] (these values can be overridden at scan time): +#' * `convert_options`: a [CsvConvertOptions] +#' * `block_size` #' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) #' @rdname FileFormat @@ -106,13 +111,20 @@ CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat) CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...), skip_rows = 0, column_names = character(0), - autogenerate_column_names = FALSE) { - dataset___CsvFileFormat__Make(opts, skip_rows, column_names, autogenerate_column_names) + autogenerate_column_names = FALSE, + convert_options = csv_file_format_convert_options(...), + block_size = 2**20) { + dataset___CsvFileFormat__Make(opts, skip_rows, column_names, + autogenerate_column_names, convert_options, block_size) } # Support both readr-style option names and Arrow C++ option names csv_file_format_parse_options <- function(...) { - opt_names <- names(list(...)) + opts <- list(...) + # Filter out arguments meant for CsvConvertOptions + convert_opts <- names(formals(CsvConvertOptions$create)) + opts[convert_opts] <- NULL + opt_names <- names(opts) # Catch any readr-style options specified with full option names that are # supported by read_delim_arrow() (and its wrappers) but are not yet # supported here @@ -168,12 +180,22 @@ csv_file_format_parse_options <- function(...) { stop("Use either Arrow parse options or readr parse options, not both", call. = FALSE) } - readr_to_csv_parse_options(...) # all options have readr-style names + do.call(readr_to_csv_parse_options, opts) # all options have readr-style names } else { - CsvParseOptions$create(...) # all options have Arrow C++ names + do.call(CsvParseOptions$create, opts) # all options have Arrow C++ names } } +csv_file_format_convert_options <- function(...) { + opts <- list(...) + # Filter out arguments meant for CsvParseOptions + arrow_opts <- names(formals(CsvParseOptions$create)) + readr_opts <- names(formals(readr_to_csv_parse_options)) + opts[arrow_opts] <- NULL + opts[readr_opts] <- NULL + do.call(CsvConvertOptions$create, opts) +} + #' Format-specific scan options #' #' @description diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index af3fd79336e..98f9099a4ce 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1105,18 +1105,20 @@ extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, int32_t skip_rows, cpp11::strings column_names, bool autogenerate_column_names); -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp){ +std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, int32_t skip_rows, cpp11::strings column_names, bool autogenerate_column_names, const std::shared_ptr& convert_options, int32_t block_size); +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp, SEXP convert_options_sexp, SEXP block_size_sexp){ BEGIN_CPP11 arrow::r::Input&>::type parse_options(parse_options_sexp); arrow::r::Input::type skip_rows(skip_rows_sexp); arrow::r::Input::type column_names(column_names_sexp); arrow::r::Input::type autogenerate_column_names(autogenerate_column_names_sexp); - return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, skip_rows, column_names, autogenerate_column_names)); + arrow::r::Input&>::type convert_options(convert_options_sexp); + arrow::r::Input::type block_size(block_size_sexp); + return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp){ +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp, SEXP convert_options_sexp, SEXP block_size_sexp){ Rf_error("Cannot call dataset___CsvFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -4273,7 +4275,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, - { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 4}, + { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 6}, { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index af57c191e2e..7f945fd7561 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -273,12 +273,17 @@ std::shared_ptr dataset___IpcFileFormat__Make() { // [[dataset::export]] std::shared_ptr dataset___CsvFileFormat__Make( const std::shared_ptr& parse_options, int32_t skip_rows, - cpp11::strings column_names, bool autogenerate_column_names) { + cpp11::strings column_names, bool autogenerate_column_names, + const std::shared_ptr& convert_options, + int32_t block_size) { auto format = std::make_shared(); - format->parse_options = *parse_options; - format->skip_rows = skip_rows; - format->column_names = cpp11::as_cpp>(column_names); - format->autogenerate_column_names = autogenerate_column_names; + format->reader_options.parse_options = *parse_options; + format->reader_options.skip_rows = skip_rows; + format->reader_options.column_names = + cpp11::as_cpp>(column_names); + format->reader_options.autogenerate_column_names = autogenerate_column_names; + format->reader_options.convert_options = *convert_options; + format->reader_options.block_size = block_size; return format; } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index e392641bd1b..de96ba17e83 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -316,6 +316,22 @@ test_that("CSV scan options", { tab <- sb$Finish()$ToTable() expect_equivalent(as.data.frame(tab), tibble(chr = c("foo", NA))) + + # Set default convert options in CsvFileFormat + csv_format <- CsvFileFormat$create(null_values = c("mynull"), + strings_can_be_null = TRUE) + ds <- open_dataset(dst_dir, format = csv_format) + expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA))) + + # Set both parse and convert options + df <- tibble(chr = c("foo", "mynull"), chr2 = c("bar", "baz")) + write.table(df, dst_file, row.names = FALSE, quote = FALSE, sep = "\t") + ds <- open_dataset(dst_dir, format = "csv", + delimiter="\t", + null_values = c("mynull"), + strings_can_be_null = TRUE) + expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA), + chr2 = c("bar", "baz"))) }) test_that("compressed CSV dataset", { From e372f59619095cbc3cdea46b29119d574b97b89b Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 22 Mar 2021 11:28:33 -0400 Subject: [PATCH 6/7] Update r/tests/testthat/test-dataset.R Co-authored-by: Neal Richardson --- r/tests/testthat/test-dataset.R | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index de96ba17e83..e6db0bcea17 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -374,10 +374,7 @@ test_that("CSV dataset options", { select(string = chr) ) - format <- FileFormat$create("csv", column_names = c("foo")) - ds <- open_dataset(dst_dir, format = format) - expect_is(ds$format, "CsvFileFormat") - expect_is(ds$filesystem, "LocalFileSystem") + ds <- open_dataset(dst_dir, format = "csv", column_names = c("foo")) expect_equivalent( ds %>% From 40f1b83dac808b40e3deb1c36c7b20e588fc9e70 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 22 Mar 2021 14:32:19 -0400 Subject: [PATCH 7/7] ARROW-8631: [C++][Python][R] don't inline options structs --- cpp/src/arrow/dataset/dataset_internal.h | 25 +++- cpp/src/arrow/dataset/file_base.h | 5 + cpp/src/arrow/dataset/file_csv.cc | 87 +++-------- cpp/src/arrow/dataset/file_csv.h | 41 ++--- cpp/src/arrow/dataset/file_csv_test.cc | 53 +++---- python/pyarrow/_csv.pxd | 9 ++ python/pyarrow/_csv.pyx | 37 ++++- python/pyarrow/_dataset.pyx | 150 ++++++++----------- python/pyarrow/includes/libarrow_dataset.pxd | 21 +-- python/pyarrow/tests/test_dataset.py | 20 ++- r/R/arrowExports.R | 8 +- r/R/dataset-format.R | 34 +++-- r/src/arrowExports.cpp | 25 ++-- r/src/dataset.cpp | 21 ++- 14 files changed, 257 insertions(+), 279 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 19648fab2de..b28bf7a14a4 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -185,17 +185,26 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded l.partition_expression == r.partition_expression; } +/// Get fragment scan options of the expected type. +/// \return Fragment scan options if provided on the scan options, else the default +/// options if set, else a default-constructed value. If options are provided +/// but of the wrong type, an error is returned. template -arrow::Result> DowncastFragmentScanOptions( - ScanOptions* scan_options, const std::string& type_name) { - if (!scan_options) return nullptr; - if (!scan_options->fragment_scan_options) return nullptr; - const auto actual_type_name = scan_options->fragment_scan_options->type_name(); - if (actual_type_name != type_name) { - return Status::Invalid("FragmentScanOptions of type ", actual_type_name, +arrow::Result> GetFragmentScanOptions( + const std::string& type_name, ScanOptions* scan_options, + const std::shared_ptr& default_options) { + auto source = default_options; + if (scan_options && scan_options->fragment_scan_options) { + source = scan_options->fragment_scan_options; + } + if (!source) { + return std::make_shared(); + } + if (source->type_name() != type_name) { + return Status::Invalid("FragmentScanOptions of type ", source->type_name(), " were provided for scanning a fragment of type ", type_name); } - return internal::checked_pointer_cast(scan_options->fragment_scan_options); + return internal::checked_pointer_cast(source); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index ca77f43eded..9c613c00aff 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -124,6 +124,11 @@ class ARROW_DS_EXPORT FileSource { /// \brief Base class for file format implementation class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this { public: + /// Options affecting how this format is scanned. + /// + /// The options here can be overridden at scan time. + std::shared_ptr default_fragment_scan_options; + virtual ~FileFormat() = default; /// \brief The name identifying the kind of file format diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b6119e92418..e736d06753b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -79,18 +79,14 @@ Result> GetColumnNames( static inline Result GetConvertOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options, const util::string_view first_block, MemoryPool* pool) { - ARROW_ASSIGN_OR_RAISE( - auto column_names, - GetColumnNames(format.reader_options.parse_options, first_block, pool)); - - auto convert_options = format.reader_options.convert_options; - ARROW_ASSIGN_OR_RAISE(auto csv_scan_options, - DowncastFragmentScanOptions( - scan_options.get(), kCsvTypeName)); - if (csv_scan_options) { - convert_options = csv_scan_options->convert_options; - } + ARROW_ASSIGN_OR_RAISE(auto column_names, + GetColumnNames(format.parse_options, first_block, pool)); + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto convert_options = csv_scan_options->convert_options; for (FieldRef ref : scan_options->MaterializedFields()) { ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema)); @@ -102,20 +98,15 @@ static inline Result GetConvertOptions( static inline Result GetReadOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options) { - auto read_options = csv::ReadOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE( + auto csv_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto read_options = csv_scan_options->read_options; // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it // here. read_options.use_threads = false; - read_options.skip_rows = format.reader_options.skip_rows; - read_options.column_names = format.reader_options.column_names; - read_options.autogenerate_column_names = - format.reader_options.autogenerate_column_names; - ARROW_ASSIGN_OR_RAISE(auto csv_scan_options, - DowncastFragmentScanOptions( - scan_options.get(), kCsvTypeName)); - read_options.block_size = - csv_scan_options ? csv_scan_options->block_size : format.reader_options.block_size; return read_options; } @@ -132,7 +123,7 @@ static inline Result> OpenReader( default_memory_pool(), std::move(input))); ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size)); - const auto& parse_options = format.reader_options.parse_options; + const auto& parse_options = format.parse_options; auto convert_options = csv::ConvertOptions::Defaults(); if (scan_options != nullptr) { ARROW_ASSIGN_OR_RAISE(convert_options, @@ -174,47 +165,17 @@ class CsvScanTask : public ScanTask { bool CsvFileFormat::Equals(const FileFormat& format) const { if (type_name() != format.type_name()) return false; - const auto other_format = checked_cast(format); - const auto& other_convert_options = other_format.reader_options.convert_options; - const auto& other_parse_options = other_format.reader_options.parse_options; - - return reader_options.convert_options.check_utf8 == other_convert_options.check_utf8 && - reader_options.convert_options.column_types == - other_convert_options.column_types && - reader_options.convert_options.null_values == - other_convert_options.null_values && - reader_options.convert_options.true_values == - other_convert_options.true_values && - reader_options.convert_options.false_values == - other_convert_options.false_values && - reader_options.convert_options.strings_can_be_null == - other_convert_options.strings_can_be_null && - reader_options.convert_options.auto_dict_encode == - other_convert_options.auto_dict_encode && - reader_options.convert_options.auto_dict_max_cardinality == - other_convert_options.auto_dict_max_cardinality && - reader_options.convert_options.include_columns == - other_convert_options.include_columns && - reader_options.convert_options.include_missing_columns == - other_convert_options.include_missing_columns && - // N.B. values are not comparable - reader_options.convert_options.timestamp_parsers.size() == - other_convert_options.timestamp_parsers.size() && - reader_options.block_size == other_format.reader_options.block_size && - reader_options.parse_options.delimiter == other_parse_options.delimiter && - reader_options.parse_options.quoting == other_parse_options.quoting && - reader_options.parse_options.quote_char == other_parse_options.quote_char && - reader_options.parse_options.double_quote == other_parse_options.double_quote && - reader_options.parse_options.escaping == other_parse_options.escaping && - reader_options.parse_options.escape_char == other_parse_options.escape_char && - reader_options.parse_options.newlines_in_values == - other_parse_options.newlines_in_values && - reader_options.parse_options.ignore_empty_lines == - other_parse_options.ignore_empty_lines && - reader_options.skip_rows == other_format.reader_options.skip_rows && - reader_options.column_names == other_format.reader_options.column_names && - reader_options.autogenerate_column_names == - other_format.reader_options.autogenerate_column_names; + const auto& other_parse_options = + checked_cast(format).parse_options; + + return parse_options.delimiter == other_parse_options.delimiter && + parse_options.quoting == other_parse_options.quoting && + parse_options.quote_char == other_parse_options.quote_char && + parse_options.double_quote == other_parse_options.double_quote && + parse_options.escaping == other_parse_options.escaping && + parse_options.escape_char == other_parse_options.escape_char && + parse_options.newlines_in_values == other_parse_options.newlines_in_values && + parse_options.ignore_empty_lines == other_parse_options.ignore_empty_lines; } Result CsvFileFormat::IsSupported(const FileSource& source) const { diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 937995f9756..b235195c5e3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -33,35 +33,11 @@ namespace dataset { constexpr char kCsvTypeName[] = "csv"; -/// \brief Per-scan options for CSV fragments -struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - std::string type_name() const override { return kCsvTypeName; } - - /// CSV conversion options - csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); - - /// Block size for reading (see arrow::csv::ReadOptions::block_size) - int32_t block_size = 1 << 20; // 1 MB -}; - /// \brief A FileFormat implementation that reads from and writes to Csv files class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: - /// \brief Dataset-wide options for CSV. For convenience (both for users, and - /// for authors of language bindings), these include fields for per-scan - /// options, however, if per-scan options are provided, then those fields will - /// be overridden. - struct ReaderOptions : CsvFragmentScanOptions { - /// Options affecting the parsing of CSV files - csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); - /// Number of header rows to skip (see arrow::csv::ReadOptions::skip_rows) - int32_t skip_rows = 0; - /// Column names for the target table (see arrow::csv::ReadOptions::column_names) - std::vector column_names; - /// Whether to generate column names or assume a header row (see - /// arrow::csv::ReadOptions::autogenerate_column_names) - bool autogenerate_column_names = false; - } reader_options; + /// Options affecting the parsing of CSV files + csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); std::string type_name() const override { return kCsvTypeName; } @@ -86,5 +62,18 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } }; +/// \brief Per-scan options for CSV fragments +struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { + std::string type_name() const override { return kCsvTypeName; } + + /// CSV conversion options + csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); + + /// CSV reading options + /// + /// Note that use_threads is always ignored. + csv::ReadOptions read_options = csv::ReadOptions::Defaults(); +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 11ff685560f..99ca7cc0f42 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -93,27 +93,6 @@ class TestCsvFileFormat : public testing::TestWithParam { std::shared_ptr opts_ = std::make_shared(); }; -TEST_P(TestCsvFileFormat, Equality) { - CsvFileFormat options1; - CsvFileFormat options2; - options2.reader_options.skip_rows = 3; - CsvFileFormat options3; - options3.reader_options.parse_options.delimiter = '\t'; - CsvFileFormat options4; - options4.reader_options.block_size = 1 << 30; - - ASSERT_TRUE(options1.Equals(options1)); - ASSERT_TRUE(options2.Equals(options2)); - ASSERT_TRUE(options3.Equals(options3)); - ASSERT_TRUE(options4.Equals(options4)); - ASSERT_FALSE(options1.Equals(options2)); - ASSERT_FALSE(options1.Equals(options3)); - ASSERT_FALSE(options1.Equals(options4)); - ASSERT_FALSE(options2.Equals(options3)); - ASSERT_FALSE(options2.Equals(options4)); - ASSERT_FALSE(options3.Equals(options4)); -} - TEST_P(TestCsvFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(R"(f64 1.0 @@ -163,21 +142,33 @@ MYNULL N/A bar)"); SetSchema({field("str", utf8())}); - format_->reader_options.skip_rows = 1; + auto defaults = std::make_shared(); + defaults->read_options.skip_rows = 1; + format_->default_fragment_scan_options = defaults; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - auto fragment_scan_options = std::make_shared(); - fragment_scan_options->block_size = 1 << 22; - opts_->fragment_scan_options = fragment_scan_options; - ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); AssertSchemaEqual(opts_->dataset_schema, physical_schema); - int64_t rows = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); - rows += batch->GetColumnByName("str")->length(); + { + int64_t rows = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + rows += batch->GetColumnByName("str")->length(); + } + ASSERT_EQ(rows, 4); + } + { + // These options completely override the default ones + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->read_options.block_size = 1 << 22; + opts_->fragment_scan_options = fragment_scan_options; + int64_t rows = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + rows += batch->GetColumnByName("header_skipped")->length(); + } + ASSERT_EQ(rows, 5); } - ASSERT_EQ(rows, 4); } TEST_P(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index f748d3e6e31..f8e12f16bc8 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -35,3 +35,12 @@ cdef class ParseOptions(_Weakrefable): @staticmethod cdef ParseOptions wrap(CCSVParseOptions options) + + +cdef class ReadOptions(_Weakrefable): + cdef: + CCSVReadOptions options + public object encoding + + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index a5c755697b8..a98160cfa99 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -73,9 +73,6 @@ cdef class ReadOptions(_Weakrefable): The character encoding of the CSV data. Columns that cannot decode using this encoding can still be read as Binary. """ - cdef: - CCSVReadOptions options - public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -161,6 +158,40 @@ cdef class ReadOptions(_Weakrefable): def autogenerate_column_names(self, value): self.options.autogenerate_column_names = value + def equals(self, ReadOptions other): + return ( + self.use_threads == other.use_threads and + self.block_size == other.block_size and + self.skip_rows == other.skip_rows and + self.column_names == other.column_names and + self.autogenerate_column_names == + other.autogenerate_column_names and + self.encoding == other.encoding + ) + + @staticmethod + cdef ReadOptions wrap(CCSVReadOptions options): + out = ReadOptions() + out.options = options + out.encoding = 'utf8' # No way to know this + return out + + def __getstate__(self): + return (self.use_threads, self.block_size, self.skip_rows, + self.column_names, self.autogenerate_column_names, + self.encoding) + + def __setstate__(self, state): + (self.use_threads, self.block_size, self.skip_rows, + self.column_names, self.autogenerate_column_names, + self.encoding) = state + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + cdef class ParseOptions(_Weakrefable): """ diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 2df69e4412c..169c6e17353 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -29,7 +29,7 @@ from pyarrow.lib cimport * from pyarrow.lib import frombytes, tobytes from pyarrow.includes.libarrow_dataset cimport * from pyarrow._fs cimport FileSystem, FileInfo, FileSelector -from pyarrow._csv cimport ConvertOptions, ParseOptions +from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions from pyarrow.util import _is_path_like, _stringify_path from pyarrow._parquet cimport ( @@ -717,6 +717,23 @@ cdef class FileFormat(_Weakrefable): def default_extname(self): return frombytes(self.format.type_name()) + @property + def default_fragment_scan_options(self): + return FragmentScanOptions.wrap( + self.wrapped.get().default_fragment_scan_options) + + @default_fragment_scan_options.setter + def default_fragment_scan_options(self, FragmentScanOptions options): + if options is None: + self.wrapped.get().default_fragment_scan_options =\ + nullptr + else: + self._set_default_fragment_scan_options(options) + + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + raise ValueError(f"Cannot set fragment scan options for " + f"'{options.type_name}' on {self.__class__.__name__}") + def __eq__(self, other): try: return self.equals(other) @@ -984,6 +1001,26 @@ cdef class FragmentScanOptions(_Weakrefable): cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): self.wrapped = sp + @staticmethod + cdef wrap(const shared_ptr[CFragmentScanOptions]& sp): + type_name = frombytes(sp.get().type_name()) + + classes = { + 'csv': CsvFragmentScanOptions, + } + + class_ = classes.get(type_name, None) + if class_ is None: + raise TypeError(type_name) + + cdef FragmentScanOptions self = class_.__new__(class_) + self.init(sp) + return self + + @property + def type_name(self): + return frombytes(self.wrapped.get().type_name()) + def __eq__(self, other): try: return self.equals(other) @@ -1392,23 +1429,14 @@ cdef class CsvFileFormat(FileFormat): __slots__ = () def __init__(self, ParseOptions parse_options=None, - skip_rows=None, column_names=None, - autogenerate_column_names=None, ConvertOptions convert_options=None, - block_size=None): + ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options - if skip_rows is not None: - self.skip_rows = skip_rows - if column_names is not None: - self.column_names = column_names - if autogenerate_column_names is not None: - self.autogenerate_column_names = autogenerate_column_names - if convert_options is not None: - self.convert_options = convert_options - if block_size is not None: - self.block_size = block_size + if convert_options is not None or read_options is not None: + self.default_fragment_scan_options = CsvFragmentScanOptions( + convert_options=convert_options, read_options=read_options) cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1419,78 +1447,26 @@ cdef class CsvFileFormat(FileFormat): @property def parse_options(self): - return ParseOptions.wrap(self.csv_format.reader_options.parse_options) + return ParseOptions.wrap(self.csv_format.parse_options) @parse_options.setter def parse_options(self, ParseOptions parse_options not None): - self.csv_format.reader_options.parse_options = parse_options.options - - @property - def skip_rows(self): - return self.csv_format.reader_options.skip_rows - - @skip_rows.setter - def skip_rows(self, int skip_rows): - self.csv_format.reader_options.skip_rows = skip_rows - - @property - def column_names(self): - return [frombytes(b) - for b in self.csv_format.reader_options.column_names] - - @column_names.setter - def column_names(self, list column_names): - self.csv_format.reader_options.column_names = [ - tobytes(s) for s in column_names] - - @property - def autogenerate_column_names(self): - return self.csv_format.reader_options.autogenerate_column_names - - @autogenerate_column_names.setter - def autogenerate_column_names(self, bint skip_rows): - self.csv_format.reader_options.autogenerate_column_names = skip_rows + self.csv_format.parse_options = parse_options.options - @property - def convert_options(self): - return ConvertOptions.wrap( - self.csv_format.reader_options.convert_options) - - @convert_options.setter - def convert_options(self, ConvertOptions convert_options not None): - self.csv_format.reader_options.convert_options = \ - convert_options.options - - @property - def block_size(self): - return self.csv_format.reader_options.block_size - - @block_size.setter - def block_size(self, int block_size): - self.csv_format.reader_options.block_size = block_size + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + if options.type_name == 'csv': + self.csv_format.default_fragment_scan_options = options.wrapped + else: + super()._set_default_fragment_scan_options(options) def equals(self, CsvFileFormat other): - return (self.parse_options.equals(other.parse_options) and - self.skip_rows == other.skip_rows and - self.column_names == other.column_names and - self.autogenerate_column_names == - other.autogenerate_column_names and - self.convert_options == other.convert_options and - self.block_size == other.block_size) + return self.parse_options.equals(other.parse_options) def __reduce__(self): - return CsvFileFormat, (self.parse_options, self.skip_rows, - self.column_names, - self.autogenerate_column_names, - self.convert_options, self.block_size) + return CsvFileFormat, (self.parse_options,) def __repr__(self): - return (f"") + return f"" cdef class CsvFragmentScanOptions(FragmentScanOptions): @@ -1502,13 +1478,14 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self, ConvertOptions convert_options=None, block_size=None): + def __init__(self, ConvertOptions convert_options=None, + ReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options - if block_size is not None: - self.block_size = block_size + if read_options is not None: + self.read_options = read_options cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1523,19 +1500,20 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): self.csv_options.convert_options = convert_options.options @property - def block_size(self): - return self.csv_options.block_size + def read_options(self): + return ReadOptions.wrap(self.csv_options.read_options) - @block_size.setter - def block_size(self, int block_size): - self.csv_options.block_size = block_size + @read_options.setter + def read_options(self, ReadOptions read_options not None): + self.csv_options.read_options = read_options.options def equals(self, CsvFragmentScanOptions other): return (self.convert_options.equals(other.convert_options) and - self.block_size == other.block_size) + self.read_options.equals(other.read_options)) def __reduce__(self): - return CsvFragmentScanOptions, (self.convert_options, self.block_size) + return CsvFragmentScanOptions, (self.convert_options, + self.read_options) cdef class Partitioning(_Weakrefable): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 171ea905e42..309d3530eec 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -60,7 +60,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) cdef cppclass CFragmentScanOptions "arrow::dataset::FragmentScanOptions": - pass + c_string type_name() const ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \ "arrow::dataset::ScanTaskIterator" @@ -169,6 +169,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: c_string type_name() const cdef cppclass CFileFormat "arrow::dataset::FileFormat": + shared_ptr[CFragmentScanOptions] default_fragment_scan_options c_string type_name() const CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const CResult[shared_ptr[CFileFragment]] MakeFragment( @@ -253,22 +254,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileFormat): pass + cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( + CFileFormat): + CCSVParseOptions parse_options + cdef cppclass CCsvFragmentScanOptions \ "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): CCSVConvertOptions convert_options - int32_t block_size - - cdef cppclass CCsvFileFormatReaderOptions \ - "arrow::dataset::ParquetFileFormat::ReaderOptions"( - CCsvFragmentScanOptions): - CCSVParseOptions parse_options - int32_t skip_rows - vector[c_string] column_names - c_bool autogenerate_column_names - - cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( - CFileFormat): - CCsvFileFormatReaderOptions reader_options + CCSVReadOptions read_options cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index aa3bee33d22..8e6bf9c3217 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -526,8 +526,10 @@ def test_file_format_pickling(): ds.CsvFileFormat(), ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), - ds.CsvFileFormat(skip_rows=3, column_names=['foo']), - ds.CsvFileFormat(skip_rows=3, block_size=2**20), + ds.CsvFileFormat(read_options=pa.csv.ReadOptions( + skip_rows=3, column_names=['foo'])), + ds.CsvFileFormat(read_options=pa.csv.ReadOptions( + skip_rows=3, block_size=2**20)), ds.ParquetFileFormat(), ds.ParquetFileFormat( read_options=ds.ParquetReadOptions(use_buffered_stream=True) @@ -547,8 +549,9 @@ def test_fragment_scan_options_pickling(): options = [ ds.CsvFragmentScanOptions(), ds.CsvFragmentScanOptions( - pa.csv.ConvertOptions(strings_can_be_null=True)), - ds.CsvFragmentScanOptions(block_size=2**16), + convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), + ds.CsvFragmentScanOptions( + read_options=pa.csv.ReadOptions(block_size=2**16)), ] for option in options: assert pickle.loads(pickle.dumps(option)) == option @@ -2264,11 +2267,13 @@ def test_csv_format_options(tempdir): assert result.equals( pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) - dataset = ds.dataset(path, format=ds.CsvFileFormat(skip_rows=1)) + dataset = ds.dataset(path, format=ds.CsvFileFormat( + read_options=pa.csv.ReadOptions(skip_rows=1))) result = dataset.to_table() assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) - dataset = ds.dataset(path, format=ds.CsvFileFormat(column_names=['foo'])) + dataset = ds.dataset(path, format=ds.CsvFileFormat( + read_options=pa.csv.ReadOptions(column_names=['foo']))) result = dataset.to_table() assert result.equals( pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])})) @@ -2282,7 +2287,8 @@ def test_csv_fragment_options(tempdir): convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], strings_can_be_null=True) options = ds.CsvFragmentScanOptions( - convert_options=convert_options, block_size=2**16) + convert_options=convert_options, + read_options=pa.csv.ReadOptions(block_size=2**16)) result = dataset.to_table(fragment_scan_options=options) assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 7d57bdc7241..2c13537f0c0 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -428,16 +428,16 @@ dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make`) } -dataset___CsvFileFormat__Make <- function(parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size){ - .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size) +dataset___CsvFileFormat__Make <- function(parse_options, convert_options, read_options){ + .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options, convert_options, read_options) } dataset___FragmentScanOptions__type_name <- function(fragment_scan_options){ .Call(`_arrow_dataset___FragmentScanOptions__type_name`, fragment_scan_options) } -dataset___CsvFragmentScanOptions__Make <- function(convert_options, block_size){ - .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, block_size) +dataset___CsvFragmentScanOptions__Make <- function(convert_options, read_options){ + .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, read_options) } dataset___DirectoryPartitioning <- function(schm){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 93482b74168..cd54a300606 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -109,21 +109,19 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' @export CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat) CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...), - skip_rows = 0, - column_names = character(0), - autogenerate_column_names = FALSE, convert_options = csv_file_format_convert_options(...), - block_size = 2**20) { - dataset___CsvFileFormat__Make(opts, skip_rows, column_names, - autogenerate_column_names, convert_options, block_size) + read_options = csv_file_format_read_options(...)) { + dataset___CsvFileFormat__Make(opts, convert_options, read_options) } # Support both readr-style option names and Arrow C++ option names csv_file_format_parse_options <- function(...) { opts <- list(...) - # Filter out arguments meant for CsvConvertOptions + # Filter out arguments meant for CsvConvertOptions/CsvReadOptions convert_opts <- names(formals(CsvConvertOptions$create)) + read_opts <- names(formals(CsvReadOptions$create)) opts[convert_opts] <- NULL + opts[read_opts] <- NULL opt_names <- names(opts) # Catch any readr-style options specified with full option names that are # supported by read_delim_arrow() (and its wrappers) but are not yet @@ -188,14 +186,28 @@ csv_file_format_parse_options <- function(...) { csv_file_format_convert_options <- function(...) { opts <- list(...) - # Filter out arguments meant for CsvParseOptions + # Filter out arguments meant for CsvParseOptions/CsvReadOptions arrow_opts <- names(formals(CsvParseOptions$create)) readr_opts <- names(formals(readr_to_csv_parse_options)) + read_opts <- names(formals(CsvReadOptions$create)) opts[arrow_opts] <- NULL opts[readr_opts] <- NULL + opts[read_opts] <- NULL do.call(CsvConvertOptions$create, opts) } +csv_file_format_read_options <- function(...) { + opts <- list(...) + # Filter out arguments meant for CsvParseOptions/CsvConvertOptions + arrow_opts <- names(formals(CsvParseOptions$create)) + readr_opts <- names(formals(readr_to_csv_parse_options)) + convert_opts <- names(formals(CsvConvertOptions$create)) + opts[arrow_opts] <- NULL + opts[readr_opts] <- NULL + opts[convert_opts] <- NULL + do.call(CsvReadOptions$create, opts) +} + #' Format-specific scan options #' #' @description @@ -244,9 +256,9 @@ as.character.FragmentScanOptions <- function(x, ...) { #' @export CsvFragmentScanOptions <- R6Class("CsvFragmentScanOptions", inherit = FragmentScanOptions) CsvFragmentScanOptions$create <- function(..., - opts = CsvConvertOptions$create(...), - block_size = 2**20) { - dataset___CsvFragmentScanOptions__Make(opts, block_size) + convert_opts = csv_file_format_convert_options(...), + read_opts = csv_file_format_read_options(...)) { + dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts) } #' Format-specific write options diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 98f9099a4ce..697451d0dd9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1105,20 +1105,17 @@ extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, int32_t skip_rows, cpp11::strings column_names, bool autogenerate_column_names, const std::shared_ptr& convert_options, int32_t block_size); -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp, SEXP convert_options_sexp, SEXP block_size_sexp){ +std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options, const std::shared_ptr& convert_options, const std::shared_ptr& read_options); +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP convert_options_sexp, SEXP read_options_sexp){ BEGIN_CPP11 arrow::r::Input&>::type parse_options(parse_options_sexp); - arrow::r::Input::type skip_rows(skip_rows_sexp); - arrow::r::Input::type column_names(column_names_sexp); - arrow::r::Input::type autogenerate_column_names(autogenerate_column_names_sexp); arrow::r::Input&>::type convert_options(convert_options_sexp); - arrow::r::Input::type block_size(block_size_sexp); - return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, skip_rows, column_names, autogenerate_column_names, convert_options, block_size)); + arrow::r::Input&>::type read_options(read_options_sexp); + return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options, convert_options, read_options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP skip_rows_sexp, SEXP column_names_sexp, SEXP autogenerate_column_names_sexp, SEXP convert_options_sexp, SEXP block_size_sexp){ +extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SEXP convert_options_sexp, SEXP read_options_sexp){ Rf_error("Cannot call dataset___CsvFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -1140,16 +1137,16 @@ extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP fragment_sc // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___CsvFragmentScanOptions__Make(const std::shared_ptr& convert_options, int32_t block_size); -extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP block_size_sexp){ +std::shared_ptr dataset___CsvFragmentScanOptions__Make(const std::shared_ptr& convert_options, const std::shared_ptr& read_options); +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP read_options_sexp){ BEGIN_CPP11 arrow::r::Input&>::type convert_options(convert_options_sexp); - arrow::r::Input::type block_size(block_size_sexp); - return cpp11::as_sexp(dataset___CsvFragmentScanOptions__Make(convert_options, block_size)); + arrow::r::Input&>::type read_options(read_options_sexp); + return cpp11::as_sexp(dataset___CsvFragmentScanOptions__Make(convert_options, read_options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP block_size_sexp){ +extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_options_sexp, SEXP read_options_sexp){ Rf_error("Cannot call dataset___CsvFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -4275,7 +4272,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update1, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, - { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 6}, + { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 3}, { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 7f945fd7561..89c3e4d56d8 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -272,18 +272,15 @@ std::shared_ptr dataset___IpcFileFormat__Make() { // [[dataset::export]] std::shared_ptr dataset___CsvFileFormat__Make( - const std::shared_ptr& parse_options, int32_t skip_rows, - cpp11::strings column_names, bool autogenerate_column_names, + const std::shared_ptr& parse_options, const std::shared_ptr& convert_options, - int32_t block_size) { + const std::shared_ptr& read_options) { auto format = std::make_shared(); - format->reader_options.parse_options = *parse_options; - format->reader_options.skip_rows = skip_rows; - format->reader_options.column_names = - cpp11::as_cpp>(column_names); - format->reader_options.autogenerate_column_names = autogenerate_column_names; - format->reader_options.convert_options = *convert_options; - format->reader_options.block_size = block_size; + format->parse_options = *parse_options; + auto scan_options = std::make_shared(); + if (convert_options) scan_options->convert_options = *convert_options; + if (read_options) scan_options->read_options = *read_options; + format->default_fragment_scan_options = std::move(scan_options); return format; } @@ -298,10 +295,10 @@ std::string dataset___FragmentScanOptions__type_name( // [[dataset::export]] std::shared_ptr dataset___CsvFragmentScanOptions__Make( const std::shared_ptr& convert_options, - int32_t block_size) { + const std::shared_ptr& read_options) { auto options = std::make_shared(); options->convert_options = *convert_options; - options->block_size = block_size; + options->read_options = *read_options; return options; }