diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 540904fbcab..2bb3a1ec1da 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -80,7 +80,7 @@ struct ARROW_EXPORT ConvertOptions { /// Whether to try to automatically dict-encode string / binary data. /// If true, then when type inference detects a string or binary column, - /// it it dict-encoded up to `auto_dict_max_cardinality` distinct values + /// it is dict-encoded up to `auto_dict_max_cardinality` distinct values /// (per chunk), after which it switches to regular encoding. /// /// This setting is ignored for non-inferred columns (those in `column_types`). diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 129d000bd49..b693c48049c 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -32,6 +32,10 @@ set(ARROW_DATASET_SRCS set(ARROW_DATASET_LINK_STATIC arrow_static) set(ARROW_DATASET_LINK_SHARED arrow_shared) +if(ARROW_CSV) + set(ARROW_DATASET_SRCS ${ARROW_DATASET_SRCS} file_csv.cc) +endif() + if(ARROW_PARQUET) set(ARROW_DATASET_LINK_STATIC ${ARROW_DATASET_LINK_STATIC} parquet_static) set(ARROW_DATASET_LINK_SHARED ${ARROW_DATASET_LINK_SHARED} parquet_shared) @@ -108,6 +112,10 @@ add_arrow_dataset_test(filter_test) add_arrow_dataset_test(partition_test) add_arrow_dataset_test(scanner_test) +if(ARROW_CSV) + add_arrow_dataset_test(file_csv_test) +endif() + if(ARROW_PARQUET) add_arrow_dataset_test(file_parquet_test) endif() diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index 4cad72e5f5b..82fbce6d704 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -20,6 +20,7 @@ #include "arrow/dataset/dataset.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_csv.h" #include "arrow/dataset/file_ipc.h" #include "arrow/dataset/file_parquet.h" #include "arrow/dataset/filter.h" diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index f642462e3b1..b1d3dcf1694 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -34,8 +34,8 @@ namespace dataset { /// \brief GetFragmentsFromDatasets transforms a vector into a /// flattened FragmentIterator. -static inline FragmentIterator GetFragmentsFromDatasets( - const DatasetVector& datasets, std::shared_ptr predicate) { +inline FragmentIterator GetFragmentsFromDatasets(const DatasetVector& datasets, + std::shared_ptr predicate) { // Iterator auto datasets_it = MakeVectorIterator(datasets); @@ -51,12 +51,11 @@ static inline FragmentIterator GetFragmentsFromDatasets( return MakeFlattenIterator(std::move(fragments_it)); } -static inline RecordBatchIterator IteratorFromReader( - std::shared_ptr reader) { +inline RecordBatchIterator IteratorFromReader(std::shared_ptr reader) { return MakeFunctionIterator([reader] { return reader->Next(); }); } -static inline std::shared_ptr SchemaFromColumnNames( +inline std::shared_ptr SchemaFromColumnNames( const std::shared_ptr& input, const std::vector& column_names) { std::vector> columns; for (FieldRef ref : column_names) { diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc new file mode 100644 index 00000000000..c6fbf93495d --- /dev/null +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/file_csv.h" + +#include +#include +#include +#include + +#include "arrow/csv/options.h" +#include "arrow/csv/reader.h" +#include "arrow/dataset/dataset_internal.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/filter.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" +#include "arrow/result.h" +#include "arrow/type.h" +#include "arrow/util/iterator.h" + +namespace arrow { +namespace dataset { + +using internal::checked_cast; +using internal::checked_pointer_cast; + +static inline Result GetConvertOptions( + const CsvFileFormat& format, const std::shared_ptr& scan_options) { + auto options = csv::ConvertOptions::Defaults(); + if (scan_options != nullptr) { + // This is set to true to match behavior with other formats; a missing column + // will be materialized as null. + options.include_missing_columns = true; + + for (const auto& field : scan_options->schema()->fields()) { + options.column_types[field->name()] = field->type(); + options.include_columns.push_back(field->name()); + } + + // FIXME(bkietz) also acquire types of fields materialized but not projected. + for (auto&& name : FieldsInExpression(scan_options->filter)) { + ARROW_ASSIGN_OR_RAISE(auto match, + FieldRef(name).FindOneOrNone(*scan_options->schema())); + if (match.indices().empty()) { + options.include_columns.push_back(std::move(name)); + } + } + } + return options; +} + +static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { + auto options = csv::ReadOptions::Defaults(); + // Multithreaded conversion of individual files would lead to excessive thread + // contention when ScanTasks are also executed in multiple threads, so we disable it + // here. + options.use_threads = false; + return options; +} + +static inline Result> OpenReader( + const FileSource& source, const CsvFileFormat& format, + const std::shared_ptr& options = nullptr, + MemoryPool* pool = default_memory_pool()) { + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + + auto reader_options = GetReadOptions(format); + const auto& parse_options = format.parse_options; + ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options)); + auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options, + parse_options, convert_options); + if (!maybe_reader.ok()) { + return maybe_reader.status().WithMessage("Could not open CSV input source '", + source.path(), "': ", maybe_reader.status()); + } + + return std::move(maybe_reader).ValueOrDie(); +} + +/// \brief A ScanTask backed by an Csv file. +class CsvScanTask : public ScanTask { + public: + CsvScanTask(std::shared_ptr format, FileSource source, + std::shared_ptr options, std::shared_ptr context) + : ScanTask(std::move(options), std::move(context)), + format_(std::move(format)), + source_(std::move(source)) {} + + Result Execute() override { + ARROW_ASSIGN_OR_RAISE(auto reader, + OpenReader(source_, *format_, options(), context()->pool)); + return IteratorFromReader(std::move(reader)); + } + + private: + std::shared_ptr format_; + FileSource source_; +}; + +Result CsvFileFormat::IsSupported(const FileSource& source) const { + RETURN_NOT_OK(source.Open().status()); + return OpenReader(source, *this).ok(); +} + +Result> CsvFileFormat::Inspect(const FileSource& source) const { + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, *this)); + return reader->schema(); +} + +Result CsvFileFormat::ScanFile( + const FileSource& source, std::shared_ptr options, + std::shared_ptr context) const { + auto this_ = checked_pointer_cast(shared_from_this()); + auto task = std::make_shared(std::move(this_), source, std::move(options), + std::move(context)); + + return MakeVectorIterator>({std::move(task)}); +} + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h new file mode 100644 index 00000000000..9c2bd088cdf --- /dev/null +++ b/cpp/src/arrow/dataset/file_csv.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/csv/options.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" +#include "arrow/result.h" + +namespace arrow { +namespace dataset { + +/// \brief A FileFormat implementation that reads from and writes to Csv files +class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { + public: + /// Options affecting the parsing of CSV files + csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); + + std::string type_name() const override { return "csv"; } + + Result IsSupported(const FileSource& source) const override; + + /// \brief Return the schema of the file if possible. + Result> Inspect(const FileSource& source) const override; + + /// \brief Open a file for scanning + Result ScanFile(const FileSource& source, + std::shared_ptr options, + std::shared_ptr context) const override; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc new file mode 100644 index 00000000000..0cd2d907fd6 --- /dev/null +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/file_csv.h" + +#include +#include +#include + +#include "arrow/dataset/dataset_internal.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/filter.h" +#include "arrow/dataset/partition.h" +#include "arrow/dataset/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" + +namespace arrow { +namespace dataset { + +class TestCsvFileFormat : public testing::Test { + public: + std::unique_ptr GetFileSource() { + return GetFileSource(R"(f64 +1.0 + +N/A +2)"); + } + + std::unique_ptr GetFileSource(std::string csv) { + return internal::make_unique(Buffer::FromString(std::move(csv))); + } + + RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { + return MakeFlattenIterator(MakeMaybeMapIterator( + [](std::shared_ptr scan_task) { return scan_task->Execute(); }, + std::move(scan_task_it))); + } + + RecordBatchIterator Batches(Fragment* fragment) { + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + return Batches(std::move(scan_task_it)); + } + + protected: + std::shared_ptr format_ = std::make_shared(); + std::shared_ptr opts_; + std::shared_ptr ctx_ = std::make_shared(); + std::shared_ptr schema_ = schema({field("f64", float64())}); +}; + +TEST_F(TestCsvFileFormat, ScanRecordBatchReader) { + auto source = GetFileSource(); + + opts_ = ScanOptions::Make(schema_); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + int64_t row_count = 0; + + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + row_count += batch->num_rows(); + } + + ASSERT_EQ(row_count, 3); +} + +TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) { + auto source = GetFileSource(""); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(""), + format_->Inspect(*source).status()); + + constexpr auto file_name = "herp/derp"; + ASSERT_OK_AND_ASSIGN( + auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name), + format_->Inspect({file_name, fs.get()}).status()); +} + +TEST_F(TestCsvFileFormat, Inspect) { + auto source = GetFileSource(); + + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); + EXPECT_EQ(*actual, *schema_); +} + +TEST_F(TestCsvFileFormat, IsSupported) { + bool supported; + + auto source = GetFileSource(""); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + ASSERT_EQ(supported, false); + + source = GetFileSource(R"(declare,two + 1,2,3)"); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + ASSERT_EQ(supported, false); + + source = GetFileSource(); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); + EXPECT_EQ(supported, true); +} + +TEST_F(TestCsvFileFormat, DISABLED_NonMaterializedFieldWithDifferingTypeFromInferred) { + auto source = GetFileSource(R"(f64,str +1.0,foo +, +N/A,bar +2,baz)"); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + // a valid schema for source: + schema_ = schema({field("f64", utf8()), field("str", utf8())}); + ScannerBuilder builder(schema_, fragment, ctx_); + // filter expression validated against declared schema + ASSERT_OK(builder.Filter("f64"_ == "str"_)); + // project only "str" + ASSERT_OK(builder.Project({"str"})); + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + + ASSERT_OK_AND_ASSIGN(auto scan_task_it, scanner->Scan()); + for (auto maybe_scan_task : scan_task_it) { + ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task)); + ASSERT_OK_AND_ASSIGN(auto batch_it, scan_task->Execute()); + for (auto maybe_batch : batch_it) { + // ERROR: "f64" is not projected and reverts to inferred type, + // breaking the comparison expression + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + } + } +} + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 0ce312eaed5..6a509cd13b9 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -32,13 +32,13 @@ namespace arrow { namespace dataset { -static ipc::IpcReadOptions default_read_options() { +static inline ipc::IpcReadOptions default_read_options() { auto options = ipc::IpcReadOptions::Defaults(); options.use_threads = false; return options; } -Result> OpenReader( +static inline Result> OpenReader( const FileSource& source, const ipc::IpcReadOptions& options = default_read_options()) { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); @@ -54,7 +54,7 @@ Result> OpenReader( return reader; } -Result> GetIncludedFields( +static inline Result> GetIncludedFields( const Schema& schema, const std::vector& materialized_fields) { std::vector included_fields; @@ -83,11 +83,12 @@ class IpcScanTask : public ScanTask { ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); auto options = default_read_options(); + options.memory_pool = pool; ARROW_ASSIGN_OR_RAISE(options.included_fields, GetIncludedFields(*reader->schema(), materialized_fields)); ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options)); - return RecordBatchIterator(Impl{std::move(reader), pool, 0}); + return RecordBatchIterator(Impl{std::move(reader), 0}); } Result> Next() { @@ -99,7 +100,6 @@ class IpcScanTask : public ScanTask { } std::shared_ptr reader_; - MemoryPool* pool_; int i_; }; diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 118b94de4f0..ed306dccafa 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -53,11 +53,13 @@ class FileFormat; class FileFragment; class FileSystemDataset; -class ParquetFileFormat; -class ParquetFileFragment; +class CsvFileFormat; class IpcFileFormat; +class ParquetFileFormat; +class ParquetFileFragment; + class Expression; using ExpressionVector = std::vector>; diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd new file mode 100644 index 00000000000..48865089da2 --- /dev/null +++ b/python/pyarrow/_csv.pxd @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.includes.libarrow cimport * + + +cdef class ParseOptions: + cdef: + CCSVParseOptions options + + @staticmethod + cdef ParseOptions wrap(CCSVParseOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 5991a7bce7c..91f06a78609 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -174,9 +174,6 @@ cdef class ParseOptions: If False, an empty line is interpreted as containing a single empty value (assuming a one-column CSV file). """ - cdef: - CCSVParseOptions options - __slots__ = () def __init__(self, delimiter=None, quote_char=None, double_quote=None, @@ -283,6 +280,27 @@ cdef class ParseOptions: def ignore_empty_lines(self, value): self.options.ignore_empty_lines = value + def equals(self, ParseOptions other): + return ( + self.delimiter == other.delimiter and + self.quote_char == other.quote_char and + self.double_quote == other.double_quote and + self.escape_char == other.escape_char and + self.newlines_in_values == other.newlines_in_values and + self.ignore_empty_lines == other.ignore_empty_lines + ) + + @staticmethod + cdef ParseOptions wrap(CCSVParseOptions options): + out = ParseOptions() + out.options = options + return out + + def __reduce__(self): + return ParseOptions, (self.delimiter, self.quote_char, + self.double_quote, self.escape_char, + self.newlines_in_values, self.ignore_empty_lines) + cdef class ConvertOptions: """ diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 8477d5e1722..23b81a00eaf 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -27,6 +27,7 @@ from pyarrow.lib cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.compat import frombytes, tobytes from pyarrow._fs cimport FileSystem, FileInfo, FileSelector +from pyarrow._csv cimport ParseOptions def _forbid_instantiation(klass, subclasses_instead=True): @@ -257,7 +258,7 @@ cdef class FileSystemDataset(Dataset): The top-level schema of the DataDataset. format : FileFormat File format to create fragments from, currently only - ParquetFileFormat and IpcFileFormat are supported. + ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. filesystem : FileSystem The filesystem which files are from. partitions : List[Expression], optional @@ -375,6 +376,8 @@ cdef class FileFormat: self = ParquetFileFormat.__new__(ParquetFileFormat) elif typ == 'ipc': self = IpcFileFormat.__new__(IpcFileFormat) + elif typ == 'csv': + self = CsvFileFormat.__new__(CsvFileFormat) else: raise TypeError(typ) @@ -764,6 +767,34 @@ cdef class IpcFileFormat(FileFormat): return IpcFileFormat, tuple() +cdef class CsvFileFormat(FileFormat): + cdef: + CCsvFileFormat* csv_format + + def __init__(self, ParseOptions parse_options=None): + self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) + if parse_options is not None: + self.parse_options = parse_options + + cdef void init(self, const shared_ptr[CFileFormat]& sp): + FileFormat.init(self, sp) + self.csv_format = sp.get() + + @property + def parse_options(self): + return ParseOptions.wrap(self.csv_format.parse_options) + + @parse_options.setter + def parse_options(self, ParseOptions parse_options not None): + self.csv_format.parse_options = parse_options.options + + def equals(self, CsvFileFormat other): + return self.parse_options.equals(other.parse_options) + + def __reduce__(self): + return CsvFileFormat, (self.parse_options,) + + cdef class Partitioning: cdef: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 37acf239aec..f7b99fa5c8b 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -26,6 +26,7 @@ CastExpression, CompareOperator, ComparisonExpression, + CsvFileFormat, Dataset, DatasetFactory, DirectoryPartitioning, @@ -215,6 +216,8 @@ def _ensure_format(obj): return ParquetFileFormat() elif obj in {"ipc", "arrow", "feather"}: return IpcFileFormat() + elif obj == "csv": + return CsvFileFormat() else: raise ValueError("format '{}' is not supported".format(obj)) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bc2ef32659c..86b24079337 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -288,6 +288,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileFormat): pass + cdef cppclass CCsvFileFormat "arrow::dataset::CsvFileFormat"( + CFileFormat): + CCSVParseOptions parse_options + cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const CResult[shared_ptr[CExpression]] Parse(const c_string & path) const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 951de2adfb6..30e815b1cc7 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,6 +25,7 @@ import pytest import pyarrow as pa +import pyarrow.csv import pyarrow.fs as fs try: @@ -537,6 +538,9 @@ def test_parquet_read_options(): def test_file_format_pickling(): formats = [ ds.IpcFileFormat(), + ds.CsvFileFormat(), + ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', + ignore_empty_lines=True)), ds.ParquetFileFormat(), ds.ParquetFileFormat( read_options=ds.ParquetReadOptions(use_buffered_stream=True) @@ -1458,6 +1462,23 @@ def test_ipc_format(tempdir): assert result.equals(table) +@pytest.mark.pandas +def test_csv_format(tempdir): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.csv') + table.to_pandas().to_csv(path, index=False) + + dataset = ds.dataset(path, format=ds.CsvFileFormat()) + result = dataset.to_table() + assert result.equals(table) + + dataset = ds.dataset(path, format='csv') + result = dataset.to_table() + assert result.equals(table) + + def test_feather_format(tempdir): from pyarrow.feather import write_feather diff --git a/r/Makefile b/r/Makefile index 9fe95ad4581..ed2e8665a66 100644 --- a/r/Makefile +++ b/r/Makefile @@ -24,7 +24,7 @@ doc: test: export ARROW_R_DEV=$(ARROW_R_DEV) && R CMD INSTALL --install-tests --no-test-load --no-docs --no-help --no-byte-compile . - export NOT_CRAN=true && export ARROW_R_DEV=$(ARROW_R_DEV) && R --slave -e 'library(testthat); setwd(file.path(.libPaths()[1], "arrow", "tests")); system.time(test_check("arrow", filter="${file}", reporter=ifelse(nchar("${r}"), "${r}", "summary")))' + export NOT_CRAN=true && export ARROW_R_DEV=$(ARROW_R_DEV) && export AWS_EC2_METADATA_DISABLED=TRUE && R --slave -e 'library(testthat); setwd(file.path(.libPaths()[1], "arrow", "tests")); system.time(test_check("arrow", filter="${file}", reporter=ifelse(nchar("${r}"), "${r}", "summary")))' deps: R --slave -e 'lib <- Sys.getenv("R_LIB", .libPaths()[1]); install.packages("devtools", repo="https://cloud.r-project.org", lib=lib); devtools::install_dev_deps(lib=lib)' diff --git a/r/NAMESPACE b/r/NAMESPACE index 568c72f0300..6921959f720 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -68,6 +68,7 @@ export(CompressedInputStream) export(CompressedOutputStream) export(CompressionType) export(CsvConvertOptions) +export(CsvFileFormat) export(CsvParseOptions) export(CsvReadOptions) export(CsvTableReader) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 581d9768543..92e1b4aa274 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -416,6 +416,10 @@ dataset___IpcFileFormat__Make <- function(){ .Call(`_arrow_dataset___IpcFileFormat__Make` ) } +dataset___CsvFileFormat__Make <- function(parse_options){ + .Call(`_arrow_dataset___CsvFileFormat__Make` , parse_options) +} + dataset___DirectoryPartitioning <- function(schm){ .Call(`_arrow_dataset___DirectoryPartitioning` , schm) } diff --git a/r/R/dataset.R b/r/R/dataset.R index 1baec6cd5d8..eb229d7abc7 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -52,7 +52,8 @@ #' is a list of `Dataset`s (because there should be few `Dataset`s in the list #' and their `Schema`s are already in memory). #' @param ... additional arguments passed to `dataset_factory()` when -#' `sources` is a file path, otherwise ignored. +#' `sources` is a file path, otherwise ignored. These may include "format" to +#' indicate the file format, or other format-specific options. #' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data, #' or call [`$NewScan()`][Scanner] to construct a query directly. #' @export @@ -122,9 +123,8 @@ open_dataset <- function(sources, #' takes the following arguments: #' * `filesystem`: A [FileSystem] #' * `selector`: A [FileSelector] -#' * `format`: A string identifier of the format of the files in `path`. -#' Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -#' are supported. For Feather, only version 2 files are supported. +#' * `format`: A [FileFormat] +#' * `partitioning`: Either `Partitioning`, `PartitioningFactory`, or `NULL` #' @section Methods: #' #' A `Dataset` has the following methods: @@ -269,7 +269,7 @@ DatasetFactory <- R6Class("DatasetFactory", inherit = ArrowObject, ) DatasetFactory$create <- function(x, filesystem = NULL, - format = c("parquet", "arrow", "ipc", "feather"), + format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), partitioning = NULL, ...) { if (is_list_of(x, "DatasetFactory")) { @@ -291,7 +291,7 @@ DatasetFactory$create <- function(x, selector <- FileSelector$create(x, allow_not_found = FALSE, recursive = TRUE) if (is.character(format)) { - format <- FileFormat$create(match.arg(format)) + format <- FileFormat$create(match.arg(format), ...) } else { assert_is(format, "FileFormat") } @@ -325,9 +325,17 @@ DatasetFactory$create <- function(x, #' `UnionDatasetFactory` and other arguments will be ignored. #' @param filesystem A [FileSystem] object; if omitted, the `FileSystem` will #' be detected from `x` -#' @param format A string identifier of the format of the files in `x`. -#' Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -#' are supported. For Feather, only version 2 files are supported. +#' @param format A [FileFormat] object, or a string identifier of the format of +#' the files in `x`. Currently supported values: +#' * "parquet" +#' * "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that +#' only version 2 files are supported +#' * "csv"/"text", aliases for the same thing (because comma is the default +#' delimiter for text files +#' * "tsv", equivalent to passing `format = "text", delimiter = "\t"` +#' +#' Default is "parquet", unless a `delimiter` is also specified, in which case +#' it is assumed to be "text". #' @param partitioning One of #' * A `Schema`, in which case the file paths relative to `sources` will be #' parsed, and path segments will be matched with the schema fields. For @@ -340,7 +348,10 @@ DatasetFactory$create <- function(x, #' by [hive_partition()] which parses explicit or autodetected fields from #' Hive-style path segments #' * `NULL` for no partitioning -#' @param ... Additional arguments, currently ignored +#' @param ... Additional format-specific options, passed to +#' `FileFormat$create()`. For CSV options, note that you can specify them either +#' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the +#' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.) #' @return A `DatasetFactory` object. Pass this to [open_dataset()], #' in a list potentially with other `DatasetFactory` objects, to create #' a `Dataset`. @@ -387,17 +398,26 @@ FileSystemDatasetFactory$create <- function(filesystem, #' #' @section Factory: #' `FileFormat$create()` takes the following arguments: -#' * `format`: A string identifier of the format of the files in `path`. -#' Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -#' are supported. For Feather, only version 2 files are supported. +#' * `format`: A string identifier of the file format. Currently supported values: +#' * "parquet" +#' * "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that +#' only version 2 files are supported +#' * "csv"/"text", aliases for the same thing (because comma is the default +#' delimiter for text files +#' * "tsv", equivalent to passing `format = "text", delimiter = "\t"` #' * `...`: Additional format-specific options -#' format="parquet": +#' +#' `format = "parquet"``: #' * `use_buffered_stream`: Read files through buffered input streams rather than #' loading entire row groups at once. This may be enabled #' to reduce memory overhead. Disabled by default. #' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. #' * `dict_columns`: Names of columns which should be read as dictionaries. #' +#' `format = "text"`: see [CsvReadOptions]. Note that you can specify them either +#' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the +#' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.) +#' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) #' @rdname FileFormat #' @name FileFormat @@ -410,6 +430,8 @@ FileFormat <- R6Class("FileFormat", inherit = ArrowObject, shared_ptr(ParquetFileFormat, self$pointer()) } else if (type == "ipc") { shared_ptr(IpcFileFormat, self$pointer()) + } else if (type == "csv") { + shared_ptr(CsvFileFormat, self$pointer()) } else { self } @@ -422,7 +444,12 @@ FileFormat <- R6Class("FileFormat", inherit = ArrowObject, ) ) FileFormat$create <- function(format, ...) { - if (format == "parquet") { + opt_names <- names(list(...)) + if (format %in% c("csv", "text") || any(opt_names %in% c("delim", "delimiter"))) { + CsvFileFormat$create(...) + } else if (format == c("tsv")) { + CsvFileFormat$create(delimiter = "\t", ...) + } else if (format == "parquet") { ParquetFileFormat$create(...) } else if (format %in% c("ipc", "arrow", "feather")) { # These are aliases for the same thing shared_ptr(IpcFileFormat, dataset___IpcFileFormat__Make()) @@ -449,6 +476,25 @@ ParquetFileFormat$create <- function(use_buffered_stream = FALSE, #' @export IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) +#' @usage NULL +#' @format NULL +#' @rdname FileFormat +#' @export +CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat) +CsvFileFormat$create <- function(..., opts = csv_file_format_parse_options(...)) { + shared_ptr(CsvFileFormat, dataset___CsvFileFormat__Make(opts)) +} + +csv_file_format_parse_options <- function(...) { + # Support both the readr spelling of options and the arrow spelling + readr_opts <- c("delim", "quote", "escape_double", "escape_backslash", "skip_empty_rows") + if (any(readr_opts %in% names(list(...)))) { + readr_to_csv_parse_options(...) + } else { + CsvParseOptions$create(...) + } +} + #' Scan the contents of a dataset #' #' @description diff --git a/r/man/Dataset.Rd b/r/man/Dataset.Rd index 5f66aef6b61..686611ca67d 100644 --- a/r/man/Dataset.Rd +++ b/r/man/Dataset.Rd @@ -45,9 +45,8 @@ takes the following arguments: \itemize{ \item \code{filesystem}: A \link{FileSystem} \item \code{selector}: A \link{FileSelector} -\item \code{format}: A string identifier of the format of the files in \code{path}. -Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -are supported. For Feather, only version 2 files are supported. +\item \code{format}: A \link{FileFormat} +\item \code{partitioning}: Either \code{Partitioning}, \code{PartitioningFactory}, or \code{NULL} } } diff --git a/r/man/FileFormat.Rd b/r/man/FileFormat.Rd index 395e1a4df35..b8959bcc89b 100644 --- a/r/man/FileFormat.Rd +++ b/r/man/FileFormat.Rd @@ -4,6 +4,7 @@ \alias{FileFormat} \alias{ParquetFileFormat} \alias{IpcFileFormat} +\alias{CsvFileFormat} \title{Dataset file formats} \description{ A \code{FileFormat} holds information about how to read and parse the files @@ -14,11 +15,18 @@ file formats (\code{ParquetFileFormat} and \code{IpcFileFormat}). \code{FileFormat$create()} takes the following arguments: \itemize{ -\item \code{format}: A string identifier of the format of the files in \code{path}. -Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -are supported. For Feather, only version 2 files are supported. +\item \code{format}: A string identifier of the file format. Currently supported values: +\itemize{ +\item "parquet" +\item "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that +only version 2 files are supported +\item "csv"/"text", aliases for the same thing (because comma is the default +delimiter for text files +\item "tsv", equivalent to passing \verb{format = "text", delimiter = "\\t"} +} \item \code{...}: Additional format-specific options -format="parquet": + +`format = "parquet"``: \itemize{ \item \code{use_buffered_stream}: Read files through buffered input streams rather than loading entire row groups at once. This may be enabled @@ -26,6 +34,10 @@ to reduce memory overhead. Disabled by default. \item \code{buffer_size}: Size of buffered stream, if enabled. Default is 8KB. \item \code{dict_columns}: Names of columns which should be read as dictionaries. } + +\code{format = "text"}: see \link{CsvReadOptions}. Note that you can specify them either +with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the +\code{readr}-style naming used in \code{\link[=read_csv_arrow]{read_csv_arrow()}} ("delim", "quote", etc.) } It returns the appropriate subclass of \code{FileFormat} (e.g. \code{ParquetFileFormat}) diff --git a/r/man/dataset_factory.Rd b/r/man/dataset_factory.Rd index c9950531aa7..efc967b36e2 100644 --- a/r/man/dataset_factory.Rd +++ b/r/man/dataset_factory.Rd @@ -7,7 +7,7 @@ dataset_factory( x, filesystem = NULL, - format = c("parquet", "arrow", "ipc", "feather"), + format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), partitioning = NULL, ... ) @@ -21,9 +21,19 @@ grouped. If this argument is specified it will be used to construct a \item{filesystem}{A \link{FileSystem} object; if omitted, the \code{FileSystem} will be detected from \code{x}} -\item{format}{A string identifier of the format of the files in \code{x}. -Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) -are supported. For Feather, only version 2 files are supported.} +\item{format}{A \link{FileFormat} object, or a string identifier of the format of +the files in \code{x}. Currently supported values: +\itemize{ +\item "parquet" +\item "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that +only version 2 files are supported +\item "csv"/"text", aliases for the same thing (because comma is the default +delimiter for text files +\item "tsv", equivalent to passing \verb{format = "text", delimiter = "\\t"} +} + +Default is "parquet", unless a \code{delimiter} is also specified, in which case +it is assumed to be "text".} \item{partitioning}{One of \itemize{ @@ -40,7 +50,10 @@ Hive-style path segments \item \code{NULL} for no partitioning }} -\item{...}{Additional arguments, currently ignored} +\item{...}{Additional format-specific options, passed to +\code{FileFormat$create()}. For CSV options, note that you can specify them either +with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the +\code{readr}-style naming used in \code{\link[=read_csv_arrow]{read_csv_arrow()}} ("delim", "quote", etc.)} } \value{ A \code{DatasetFactory} object. Pass this to \code{\link[=open_dataset]{open_dataset()}}, diff --git a/r/man/open_dataset.Rd b/r/man/open_dataset.Rd index f42ceccc799..379ace415ba 100644 --- a/r/man/open_dataset.Rd +++ b/r/man/open_dataset.Rd @@ -50,7 +50,8 @@ is a list of \code{Dataset}s (because there should be few \code{Dataset}s in the and their \code{Schema}s are already in memory).} \item{...}{additional arguments passed to \code{dataset_factory()} when -\code{sources} is a file path, otherwise ignored.} +\code{sources} is a file path, otherwise ignored. These may include "format" to +indicate the file format, or other format-specific options.} } \value{ A \link{Dataset} R6 object. Use \code{dplyr} methods on it to query the data, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index f05cc950998..0fcfb19a83d 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1645,6 +1645,21 @@ RcppExport SEXP _arrow_dataset___IpcFileFormat__Make(){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___CsvFileFormat__Make(const std::shared_ptr& parse_options); +RcppExport SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type parse_options(parse_options_sexp); + return Rcpp::wrap(dataset___CsvFileFormat__Make(parse_options)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){ + Rf_error("Cannot call dataset___CsvFileFormat__Make(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); @@ -5986,6 +6001,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, + { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index d5c9d836dd8..70247b1d3a5 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -174,6 +174,14 @@ std::shared_ptr dataset___IpcFileFormat__Make() { return std::make_shared(); } +// [[arrow::export]] +std::shared_ptr dataset___CsvFileFormat__Make( + const std::shared_ptr& parse_options) { + auto format = std::make_shared(); + format->parse_options = *parse_options; + return format; +} + // DirectoryPartitioning, HivePartitioning // [[arrow::export]] diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index acf4d73d86d..2f9f787d52f 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -28,6 +28,8 @@ make_temp_dir <- function() { dataset_dir <- make_temp_dir() hive_dir <- make_temp_dir() ipc_dir <- make_temp_dir() +csv_dir <- make_temp_dir() +tsv_dir <- make_temp_dir() first_date <- lubridate::ymd_hms("2015-04-29 03:12:39") df1 <- tibble( @@ -68,6 +70,20 @@ test_that("Setup (putting data in the dir)", { write_feather(df1, file.path(ipc_dir, 3, "file1.arrow")) write_feather(df2, file.path(ipc_dir, 4, "file2.arrow")) expect_length(dir(ipc_dir, recursive = TRUE), 2) + + # Now, CSV + dir.create(file.path(csv_dir, 5)) + dir.create(file.path(csv_dir, 6)) + write.csv(df1, file.path(csv_dir, 5, "file1.csv"), row.names = FALSE) + write.csv(df2, file.path(csv_dir, 6, "file2.csv"), row.names = FALSE) + expect_length(dir(csv_dir, recursive = TRUE), 2) + + # Now, tab-delimited + dir.create(file.path(tsv_dir, 5)) + dir.create(file.path(tsv_dir, 6)) + write.table(df1, file.path(tsv_dir, 5, "file1.tsv"), row.names = FALSE, sep = "\t") + write.table(df2, file.path(tsv_dir, 6, "file2.tsv"), row.names = FALSE, sep = "\t") + expect_length(dir(tsv_dir, recursive = TRUE), 2) }) test_that("Simple interface for datasets", { @@ -205,6 +221,68 @@ test_that("IPC/Feather format data", { ) }) +test_that("CSV dataset", { + ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") + expect_identical(names(ds), c(names(df1), "part")) + expect_warning( + dim(ds), + "Number of rows unknown; returning NA" + ) + expect_equivalent( + ds %>% + select(string = chr, integer = int, part) %>% + filter(integer > 6 & part == 5) %>% + collect() %>% + summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64 + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) +}) + +test_that("Other text delimited dataset", { + ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv") + expect_equivalent( + ds1 %>% + select(string = chr, integer = int, part) %>% + filter(integer > 6 & part == 5) %>% + collect() %>% + summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64 + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) + + ds2 <- open_dataset(tsv_dir, partitioning = "part", format = "text", delimiter = "\t") + expect_equivalent( + ds2 %>% + select(string = chr, integer = int, part) %>% + filter(integer > 6 & part == 5) %>% + collect() %>% + summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64 + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) + + # Now with readr option spelling (and omitting format = "text") + ds3 <- open_dataset(tsv_dir, partitioning = "part", delim = "\t") + expect_equivalent( + ds3 %>% + select(string = chr, integer = int, part) %>% + filter(integer > 6 & part == 5) %>% + collect() %>% + summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64 + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) +}) + test_that("Dataset with multiple file formats", { skip("https://issues.apache.org/jira/browse/ARROW-7653") ds <- open_dataset(list(