From 74bd283a1ff7ca78b3d0b9a4bbbb67400f74efde Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 4 Jun 2019 17:01:28 -0500 Subject: [PATCH 1/7] Begin API drafting --- cpp/src/arrow/dataset/CMakeLists.txt | 19 ++++++ cpp/src/arrow/dataset/README.md | 27 +++++++++ cpp/src/arrow/dataset/api.h | 23 ++++++++ cpp/src/arrow/dataset/dataset.h | 76 ++++++++++++++++++++++++ cpp/src/arrow/dataset/disk_store.h | 57 ++++++++++++++++++ cpp/src/arrow/dataset/file_base.h | 88 ++++++++++++++++++++++++++++ cpp/src/arrow/dataset/file_csv.h | 71 ++++++++++++++++++++++ cpp/src/arrow/dataset/file_feather.h | 56 ++++++++++++++++++ cpp/src/arrow/dataset/file_json.h | 64 ++++++++++++++++++++ cpp/src/arrow/dataset/file_parquet.h | 65 ++++++++++++++++++++ cpp/src/arrow/dataset/filter.h | 40 +++++++++++++ cpp/src/arrow/dataset/scan.h | 60 +++++++++++++++++++ cpp/src/arrow/dataset/transaction.h | 18 ++++++ cpp/src/arrow/dataset/type_fwd.h | 51 ++++++++++++++++ cpp/src/arrow/util/interfaces.h | 32 ++++++++++ 15 files changed, 747 insertions(+) create mode 100644 cpp/src/arrow/dataset/CMakeLists.txt create mode 100644 cpp/src/arrow/dataset/README.md create mode 100644 cpp/src/arrow/dataset/api.h create mode 100644 cpp/src/arrow/dataset/dataset.h create mode 100644 cpp/src/arrow/dataset/disk_store.h create mode 100644 cpp/src/arrow/dataset/file_base.h create mode 100644 cpp/src/arrow/dataset/file_csv.h create mode 100644 cpp/src/arrow/dataset/file_feather.h create mode 100644 cpp/src/arrow/dataset/file_json.h create mode 100644 cpp/src/arrow/dataset/file_parquet.h create mode 100644 cpp/src/arrow/dataset/filter.h create mode 100644 cpp/src/arrow/dataset/scan.h create mode 100644 cpp/src/arrow/dataset/transaction.h create mode 100644 cpp/src/arrow/dataset/type_fwd.h create mode 100644 cpp/src/arrow/util/interfaces.h diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt new file mode 100644 index 00000000000..3db824839aa --- /dev/null +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Headers: top level +arrow_install_all_headers("arrow/dataset") diff --git a/cpp/src/arrow/dataset/README.md b/cpp/src/arrow/dataset/README.md new file mode 100644 index 00000000000..cbbe1251e14 --- /dev/null +++ b/cpp/src/arrow/dataset/README.md @@ -0,0 +1,27 @@ + + +# Arrow C++ Datasets + +The `arrow::dataset` subcomponent provides an API to read and write +semantic datasets stored in different locations and formats. It +facilitates parallel processing of datasets spread across different +physical files and serialization formats. Other concerns such as +partitioning, filtering (partition- and column-level), and schema +normalization are also addressed. \ No newline at end of file diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h new file mode 100644 index 00000000000..408d604eb80 --- /dev/null +++ b/cpp/src/arrow/dataset/api.h @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/dataset/format_base.h" +#include "arrow/dataset/format_csv.h" +#include "arrow/dataset/format_parquet.h" +#include "arrow/dataset/reader.h" diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h new file mode 100644 index 00000000000..ecdacf7720c --- /dev/null +++ b/cpp/src/arrow/dataset/dataset.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/dataset/file_base.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +namespace fs { + +class FileSystem; + +} // namespace fs + +namespace dataset { + +class DataSource; +class Filter; +class ScanBuilder; +class ScanTask; +using ScanTaskIterator = Iterator>; + +/// \brief A granular piece of a Dataset, such as an individual file, +/// which can be scanned separately from other fragments +class ARROW_EXPORT DataFragment { + public: + /// \brief Return true if the fragment can benefit from parallel + /// scanning + virtual bool splittable() const = 0; +}; + +/// \brief A basic component of a Dataset which yields zero or more +/// DataFragments +class ARROW_EXPORT DataSource { + public: + virtual ~DataSource() = default; +}; + +/// \brief Top-level interface for a muti-source +class ARROW_EXPORT Dataset : public std::enable_shared_from_this { + public: + explicit Dataset(const std::shared_ptr& source); + explicit Dataset(const std::vector>& sources); + + virtual ~Dataset() = default; + + /// \brief Begin to build a new Scan operation against this Dataset + ScannerBuilder NewScan() const; + + const std::vector>& sources() const { return sources_; } + + protected: + std::vector> sources_; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/disk_store.h b/cpp/src/arrow/dataset/disk_store.h new file mode 100644 index 00000000000..58718283b1b --- /dev/null +++ b/cpp/src/arrow/dataset/disk_store.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace arrow { + +namespace fs { + +class FileSystem; + +} // namespace fs + +namespace dataset { + +/// \brief Loads a previously-written collection of Arrow protocol +/// files and exposes them in a way that can be consumed as a Dataset +/// source +class ARROW_EXPORT DiskStoreReader : public DatasetSource { + public: + DiskStoreReader(const std::string& path, fs::FileSystem* filesystem); + + private: + class DiskStoreReaderImpl; + std::unique_ptr impl_; + + std::string path_; + fs::FileSystem* filesystem_; + + DiskStoreReader() {} +}; + +/// \brief +class ARROW_EXPORT DiskStoreWriter { + public: + Status Write(const RecordBatch& batch); + + private: + DiskStoreWriter() {} +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h new file mode 100644 index 00000000000..0173e803df0 --- /dev/null +++ b/cpp/src/arrow/dataset/file_base.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/dataset/type_fwd.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_EXPORT FileLocation { + public: + enum LocationType { REMOTE, IN_MEMORY }; + + FileLocation(std::string path, fs::FileSystem* filesystem) + : FileLocation(FileLocation::REMOTE), + path_(std::move(path)), + filesystem_(filesystem) {} + + FileLocation(std::shared_ptr buffer) + : FileLocation(FileLocation::IN_MEMORY), buffer_(std::move(buffer)) {} + + private: + FileLocation(LocationType type) : type_(type) {} + + FileLocation::type type_; + + // + std::string path_; + fs::FileSystem* filesystem_; + + std::shared_ptr buffer_; +}; + +/// \brief Base class for file scanning options +class ARROW_EXPORT FileScanOptions { + public: + virtual ~FileScanOptions() = default; + + /// \brief The name of the file format this options corresponds to + virtual std::string file_type() const = 0; +}; + +/// \brief Base class for file writing options +class ARROW_EXPORT FileWriteOptions { + public: + virtual ~FileWriteOptions() = default; + + /// \brief The name of the file format this options corresponds to + virtual std::string file_type() const = 0; +}; + +/// \brief Base class for file writing options +class ARROW_EXPORT FileFormat { + public: + virtual ~FileFormat() = default; + + virtual std::string name() const = 0; + + /// \brief Return true if the given file extension + virtual bool IsValidExtension(const std::string& ext) const = 0; + + /// \brief Open a file for scanning + virtual Status ScanFile(const std::string& path, const FileScanOptions& options, + fs::FileSystem* filesystem, + std::unique_ptr* out) const = 0; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h new file mode 100644 index 00000000000..1a15ad46591 --- /dev/null +++ b/cpp/src/arrow/dataset/file_csv.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/csv/options.h" +#include "arrow/dataset/file_base.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +namespace fs { + +class FileSystem; + +} // namespace fs + +namespace dataset { + +class ARROW_EXPORT CsvScanOptions : public FileScanOptions { + public: + /// + std::string file_type() const override; + + private: + csv::ParseOptions parse_options_; + csv::ConvertOptions convert_options_; + csv::ReadOptions read_options_; +}; + +class ARROW_EXPORT CsvWriteOptions : public FileWriteOptions { + public: + virtual ~FileWriteOptions() = default; + + /// + virtual file_type() const = 0; +}; + +/// \brief A FileFormat implementation that reads from CSV files +class ARROW_EXPORT CsvFileFormat : public FileFormat { + public: + std::string name() const override; + + /// \brief Return true if the given file extension + bool IsKnownExtension(const std::string& ext) const override; + + /// \brief Open a file for scanning + Status ScanFile(const std::string& path, const FileScanOptions& options, + fs::FileSystem* filesystem, + std::unique_ptr* out) const override; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h new file mode 100644 index 00000000000..4adb9599033 --- /dev/null +++ b/cpp/src/arrow/dataset/file_feather.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_EXPORT FeatherScanOptions : public FileScanOptions { + public: + std::string file_type() const override; +}; + +class ARROW_EXPORT FeatherWriterOptions : public FileWriteOptions { + public: + std::string file_type() const override; +}; + +/// \brief A FileFormat implementation that reads from Feather (Arrow +/// IPC protocol) files +class ARROW_EXPORT FeatherFileFormat : public FileFormat { + public: + std::string name() const override; + + /// \brief Return true if the given file extension + bool IsKnownExtension(const std::string& ext) const override; + + /// \brief Open a file for scanning + Status ScanFile(const std::string& path, const FileScanOptions& options, + fs::FileSystem* filesystem, + std::unique_ptr* out) const override; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h new file mode 100644 index 00000000000..b44d4035245 --- /dev/null +++ b/cpp/src/arrow/dataset/file_json.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/json/options.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_EXPORT JsonScanOptions : public FileScanOptions { + public: + /// + std::string file_type() const override + + private: + json::ParseOptions parse_options_; + json::ReadOptions read_options_; +}; + +class ARROW_EXPORT JsonWriteOptions : public FileWriteOptions { + public: + virtual ~FileWriteOptions() = default; + + /// + virtual file_type() const = 0; +}; + +/// \brief A FileFormat implementation that reads from JSON files +class ARROW_EXPORT JsonFileFormat : public FileFormat { + public: + std::string name() const override; + + /// \brief Return true if the given file extension + bool IsKnownExtension(const std::string& ext) const override; + + /// \brief Open a file for scanning + Status ScanFile(const std::string& path, const FileScanOptions& options, + fs::FileSystem* filesystem, + std::unique_ptr* out) const override; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h new file mode 100644 index 00000000000..2a1561cc0b9 --- /dev/null +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_EXPORT ParquetScanOptions : public FileScanOptions { + public: + std::string file_type() const override; +}; + +class ARROW_EXPORT ParquetWriteOptions : public FileWriteOptions { + public: + std::string file_type() const override; +}; + +class ARROW_EXPORT ParquetFragment : public DataFragment { + public: + bool splittable() const override; + + const FileLocation& location() const { return *location_; } + + private: + std::shared_ptr location_; +}; + +/// \brief A FileFormat implementation that reads from Parquet files +class ARROW_EXPORT ParquetFileFormat : public FileFormat { + public: + std::string name() const override; + + /// \brief Return true if the given file extension + bool IsKnownExtension(const std::string& ext) const override; + + /// \brief Open a file for scanning + Status ScanFile(const std::string& path, const FileScanOptions& options, + fs::FileSystem* filesystem, + std::unique_ptr* out) const override; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h new file mode 100644 index 00000000000..b3353e83d55 --- /dev/null +++ b/cpp/src/arrow/dataset/filter.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_EXPORT Filter { + public: + enum type { + /// Simple boolean predicate consisting of comparisons and boolean + /// logic (AND, OR, NOT) + SIMPLE, + + /// + GENERIC + }; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scan.h b/cpp/src/arrow/dataset/scan.h new file mode 100644 index 00000000000..09c06d7b523 --- /dev/null +++ b/cpp/src/arrow/dataset/scan.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace dataset { + +class Dataset; + +class ARROW_EXPORT ScanTask { + public: + RecordBatchIterator +}; + +class ARROW_EXPORT Scanner { + public: + protected: + friend class ScannerBuilder; +}; + +class ARROW_EXPORT ScannerBuilder { + public: + /// \brief Set + ScannerBuilder* Project(const std::vector& columns) const; + + ScannerBuilder* AddFilter(const std::shared_ptr& filter) const; + + /// \brief If true (default), add + ScannerBuilder* IncludePartitionKeys(bool include = true) const; + + /// \brief Return the constructed now-immutable Scanner object + std::unique_ptr Finish() const; + + private: + std::shared_ptr dataset_; + std::vector project_columns_; + std::vector> filters_; + bool include_partition_keys_; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/transaction.h b/cpp/src/arrow/dataset/transaction.h new file mode 100644 index 00000000000..012c1c11e73 --- /dev/null +++ b/cpp/src/arrow/dataset/transaction.h @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h new file mode 100644 index 00000000000..a834f679fe9 --- /dev/null +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/util/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +namespace fs { + +class FileSystem; + +} // namespace fs + +namespace dataset { + +class Dataset; +class DatasetFragment; +class DatasetSource; +class FileFormat; +class FileScanOptions; +class FileWriteOptions; +class Filter; +class Partition; +class PartitionKey; +class Scanner; +class ScannerBuilder; +class ScanTask; +using ScanTaskIterator = Iterator>; +using RecordBatchIterator = Iterator>; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/util/interfaces.h b/cpp/src/arrow/util/interfaces.h new file mode 100644 index 00000000000..2e0383fa18e --- /dev/null +++ b/cpp/src/arrow/util/interfaces.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Status; + +template +class Iterator { + public: + virtual Status Next(T* out) = 0; +} + +} // namespace arrow From 01c4279a7a12635ce40057a3b50b5eef26d12682 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 5 Jun 2019 15:02:40 -0500 Subject: [PATCH 2/7] Checkpoint --- cpp/src/arrow/dataset/CMakeLists.txt | 16 +++ cpp/src/arrow/dataset/README.md | 6 +- cpp/src/arrow/dataset/api.h | 2 + cpp/src/arrow/dataset/dataset.h | 40 +++--- cpp/src/arrow/dataset/{scan.h => discovery.h} | 48 +++---- cpp/src/arrow/dataset/disk_store.h | 4 +- cpp/src/arrow/dataset/file_base.h | 54 +++++--- cpp/src/arrow/dataset/file_csv.h | 11 +- cpp/src/arrow/dataset/file_feather.h | 9 +- cpp/src/arrow/dataset/file_json.h | 9 +- cpp/src/arrow/dataset/file_parquet.h | 18 +-- cpp/src/arrow/dataset/file_test.cc | 16 +++ cpp/src/arrow/dataset/filter.h | 4 +- cpp/src/arrow/dataset/partition.h | 122 ++++++++++++++++++ cpp/src/arrow/dataset/scanner.h | 83 ++++++++++++ cpp/src/arrow/dataset/transaction.h | 9 ++ cpp/src/arrow/dataset/type_fwd.h | 22 +++- cpp/src/arrow/dataset/visibility.h | 48 +++++++ 18 files changed, 413 insertions(+), 108 deletions(-) rename cpp/src/arrow/dataset/{scan.h => discovery.h} (52%) create mode 100644 cpp/src/arrow/dataset/file_test.cc create mode 100644 cpp/src/arrow/dataset/partition.h create mode 100644 cpp/src/arrow/dataset/scanner.h create mode 100644 cpp/src/arrow/dataset/visibility.h diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 3db824839aa..f521233db5a 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -15,5 +15,21 @@ # specific language governing permissions and limitations # under the License. +add_custom_target(arrow_dataset) + # Headers: top level arrow_install_all_headers("arrow/dataset") + +set(ARROW_DATASET_SRCS + + ) + +add_arrow_lib(arrow_dataset + OUTPUTS ARROW_DATASET_LIBRARIES + SOURCES ${ARROW_DATASET_SRCS} + SHARED_LINK_LIBS arrow_shared + STATIC_LINK_LIBS arrow_static) + +foreach(LIB_TARGET ${ARROW_DATASET_LIBRARIES}) + target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_DS_EXPORTING) +endforeach() diff --git a/cpp/src/arrow/dataset/README.md b/cpp/src/arrow/dataset/README.md index cbbe1251e14..a7379db448d 100644 --- a/cpp/src/arrow/dataset/README.md +++ b/cpp/src/arrow/dataset/README.md @@ -24,4 +24,8 @@ semantic datasets stored in different locations and formats. It facilitates parallel processing of datasets spread across different physical files and serialization formats. Other concerns such as partitioning, filtering (partition- and column-level), and schema -normalization are also addressed. \ No newline at end of file +normalization are also addressed. + +## Development Status + +Pre-alpha as of June 2019. API subject to change without notice. \ No newline at end of file diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index 408d604eb80..c156ec81bd5 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -17,7 +17,9 @@ #pragma once +#include "arrow/dataset/discovery.h" #include "arrow/dataset/format_base.h" #include "arrow/dataset/format_csv.h" #include "arrow/dataset/format_parquet.h" #include "arrow/dataset/reader.h" +#include "arrow/dataset/scanner.h" diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index ecdacf7720c..beebd835d6e 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -19,29 +19,15 @@ #include -#include "arrow/dataset/file_base.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" namespace arrow { - -namespace fs { - -class FileSystem; - -} // namespace fs - namespace dataset { -class DataSource; -class Filter; -class ScanBuilder; -class ScanTask; -using ScanTaskIterator = Iterator>; - /// \brief A granular piece of a Dataset, such as an individual file, -/// which can be scanned separately from other fragments -class ARROW_EXPORT DataFragment { +/// which can be read/scanned separately from other fragments +class ARROW_DS_EXPORT DataFragment { public: /// \brief Return true if the fragment can benefit from parallel /// scanning @@ -50,13 +36,25 @@ class ARROW_EXPORT DataFragment { /// \brief A basic component of a Dataset which yields zero or more /// DataFragments -class ARROW_EXPORT DataSource { +class ARROW_DS_EXPORT DataSource { public: virtual ~DataSource() = default; + + virtual std::unique_ptr GetFragments() = 0; +}; + +/// \brief A DataSource consisting of a flat sequence of DataFragments +class ARROW_DS_EXPORT SimpleDataSource : public DataSource { + public: + std::unique_ptr GetFragments() override; + + private: + DataFragmentVector fragments_; }; -/// \brief Top-level interface for a muti-source -class ARROW_EXPORT Dataset : public std::enable_shared_from_this { +/// \brief Top-level interface for a Dataset with fragments coming +/// from possibly multiple sources +class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: explicit Dataset(const std::shared_ptr& source); explicit Dataset(const std::vector>& sources); diff --git a/cpp/src/arrow/dataset/scan.h b/cpp/src/arrow/dataset/discovery.h similarity index 52% rename from cpp/src/arrow/dataset/scan.h rename to cpp/src/arrow/dataset/discovery.h index 09c06d7b523..cbabcda4a4f 100644 --- a/cpp/src/arrow/dataset/scan.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -15,46 +15,30 @@ // specific language governing permissions and limitations // under the License. +/// Logic for automatically determining the structure of multi-file +/// dataset with possible partitioning according to available +/// partition schemes + #pragma once -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include + +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class Dataset; - -class ARROW_EXPORT ScanTask { - public: - RecordBatchIterator -}; - -class ARROW_EXPORT Scanner { - public: - protected: - friend class ScannerBuilder; +struct ARROW_DS_EXPORT DiscoveryOptions { + std::shared_ptr format = nullptr; + std::shared_ptr partition_scheme = nullptr; }; -class ARROW_EXPORT ScannerBuilder { - public: - /// \brief Set - ScannerBuilder* Project(const std::vector& columns) const; - - ScannerBuilder* AddFilter(const std::shared_ptr& filter) const; - - /// \brief If true (default), add - ScannerBuilder* IncludePartitionKeys(bool include = true) const; - - /// \brief Return the constructed now-immutable Scanner object - std::unique_ptr Finish() const; - - private: - std::shared_ptr dataset_; - std::vector project_columns_; - std::vector> filters_; - bool include_partition_keys_; -}; +/// \brief Using a root directory +ARROW_DS_EXPORT +Status DiscoverSource(const std::string& path, fs::FileSystem* filesystem, + const DiscoveryOptions& options, + std::shared_ptr* out); } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/disk_store.h b/cpp/src/arrow/dataset/disk_store.h index 58718283b1b..08c85e98b7f 100644 --- a/cpp/src/arrow/dataset/disk_store.h +++ b/cpp/src/arrow/dataset/disk_store.h @@ -30,7 +30,7 @@ namespace dataset { /// \brief Loads a previously-written collection of Arrow protocol /// files and exposes them in a way that can be consumed as a Dataset /// source -class ARROW_EXPORT DiskStoreReader : public DatasetSource { +class ARROW_DS_EXPORT DiskStoreReader : public DatasetSource { public: DiskStoreReader(const std::string& path, fs::FileSystem* filesystem); @@ -45,7 +45,7 @@ class ARROW_EXPORT DiskStoreReader : public DatasetSource { }; /// \brief -class ARROW_EXPORT DiskStoreWriter { +class ARROW_DS_EXPORT DiskStoreWriter { public: Status Write(const RecordBatch& batch); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 0173e803df0..1ea509edefa 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -20,30 +20,34 @@ #include #include "arrow/dataset/type_fwd.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/util/compression.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class ARROW_EXPORT FileLocation { +/// \brief +class ARROW_DS_EXPORT FileSource { public: - enum LocationType { REMOTE, IN_MEMORY }; + enum SourceType { PATH, BUFFER }; - FileLocation(std::string path, fs::FileSystem* filesystem) - : FileLocation(FileLocation::REMOTE), + FileSource(std::string path, fs::FileSystem* filesystem, + Compression::type compression = Compression::UNCOMPRESSED) + : FileSource(FileSource::PATH), path_(std::move(path)), filesystem_(filesystem) {} - FileLocation(std::shared_ptr buffer) - : FileLocation(FileLocation::IN_MEMORY), buffer_(std::move(buffer)) {} + FileSource(std::shared_ptr buffer, + Compression::type compression = Compression::UNCOMPRESSED) + : FileSource(FileSource::BUFFER), buffer_(std::move(buffer)) {} - private: - FileLocation(LocationType type) : type_(type) {} + SourceType type() const { return type_; } - FileLocation::type type_; + private: + FileSource(SourceType type) : type_(type) {} + SourceType type_; - // + // PATH-based source std::string path_; fs::FileSystem* filesystem_; @@ -51,7 +55,7 @@ class ARROW_EXPORT FileLocation { }; /// \brief Base class for file scanning options -class ARROW_EXPORT FileScanOptions { +class ARROW_DS_EXPORT FileScanOptions { public: virtual ~FileScanOptions() = default; @@ -60,7 +64,7 @@ class ARROW_EXPORT FileScanOptions { }; /// \brief Base class for file writing options -class ARROW_EXPORT FileWriteOptions { +class ARROW_DS_EXPORT FileWriteOptions { public: virtual ~FileWriteOptions() = default; @@ -68,8 +72,8 @@ class ARROW_EXPORT FileWriteOptions { virtual std::string file_type() const = 0; }; -/// \brief Base class for file writing options -class ARROW_EXPORT FileFormat { +/// \brief Base class for file format implementation +class ARROW_DS_EXPORT FileFormat { public: virtual ~FileFormat() = default; @@ -79,10 +83,24 @@ class ARROW_EXPORT FileFormat { virtual bool IsValidExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const std::string& path, const FileScanOptions& options, - fs::FileSystem* filesystem, + virtual Status ScanFile(const FileSource& location, const FileScanOptions& options, + std::shared_ptr scan_context, std::unique_ptr* out) const = 0; }; +/// \brief A DataFragment that is stored in a file with a known format +class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { + public: + FileBasedDataFragment(const FileSource& location, + std::shared_ptr format); + + const FileSource& location() const { return location_; } + std::shared_ptr format() const { return format_; } + + protected: + FileSource location_; + std::shared_ptr format_; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 1a15ad46591..8cfeecc5395 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -21,8 +21,9 @@ #include "arrow/csv/options.h" #include "arrow/dataset/file_base.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/util/iterator.h" +#include "arrow/dataset/visibility.h" namespace arrow { @@ -34,7 +35,7 @@ class FileSystem; namespace dataset { -class ARROW_EXPORT CsvScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { public: /// std::string file_type() const override; @@ -45,7 +46,7 @@ class ARROW_EXPORT CsvScanOptions : public FileScanOptions { csv::ReadOptions read_options_; }; -class ARROW_EXPORT CsvWriteOptions : public FileWriteOptions { +class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions { public: virtual ~FileWriteOptions() = default; @@ -54,7 +55,7 @@ class ARROW_EXPORT CsvWriteOptions : public FileWriteOptions { }; /// \brief A FileFormat implementation that reads from CSV files -class ARROW_EXPORT CsvFileFormat : public FileFormat { +class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: std::string name() const override; diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index 4adb9599033..b6737c326f3 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -21,25 +21,24 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class ARROW_EXPORT FeatherScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT FeatherScanOptions : public FileScanOptions { public: std::string file_type() const override; }; -class ARROW_EXPORT FeatherWriterOptions : public FileWriteOptions { +class ARROW_DS_EXPORT FeatherWriterOptions : public FileWriteOptions { public: std::string file_type() const override; }; /// \brief A FileFormat implementation that reads from Feather (Arrow /// IPC protocol) files -class ARROW_EXPORT FeatherFileFormat : public FileFormat { +class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { public: std::string name() const override; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index b44d4035245..81600070f41 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -22,13 +22,12 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/json/options.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class ARROW_EXPORT JsonScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { public: /// std::string file_type() const override @@ -38,7 +37,7 @@ class ARROW_EXPORT JsonScanOptions : public FileScanOptions { json::ReadOptions read_options_; }; -class ARROW_EXPORT JsonWriteOptions : public FileWriteOptions { +class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { public: virtual ~FileWriteOptions() = default; @@ -47,7 +46,7 @@ class ARROW_EXPORT JsonWriteOptions : public FileWriteOptions { }; /// \brief A FileFormat implementation that reads from JSON files -class ARROW_EXPORT JsonFileFormat : public FileFormat { +class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { public: std::string name() const override; diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 2a1561cc0b9..cc77449ef0d 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -21,34 +21,28 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class ARROW_EXPORT ParquetScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT ParquetScanOptions : public FileScanOptions { public: std::string file_type() const override; }; -class ARROW_EXPORT ParquetWriteOptions : public FileWriteOptions { +class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { public: std::string file_type() const override; }; -class ARROW_EXPORT ParquetFragment : public DataFragment { +class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: - bool splittable() const override; - - const FileLocation& location() const { return *location_; } - - private: - std::shared_ptr location_; + bool splittable() const override { return true; } }; /// \brief A FileFormat implementation that reads from Parquet files -class ARROW_EXPORT ParquetFileFormat : public FileFormat { +class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: std::string name() const override; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc new file mode 100644 index 00000000000..b248758bc12 --- /dev/null +++ b/cpp/src/arrow/dataset/file_test.cc @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index b3353e83d55..91505f2efd0 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -19,12 +19,12 @@ #include -#include "arrow/util/visibility.h" +#include "arrow/dataset/visibility.h" namespace arrow { namespace dataset { -class ARROW_EXPORT Filter { +class ARROW_DS_EXPORT Filter { public: enum type { /// Simple boolean predicate consisting of comparisons and boolean diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h new file mode 100644 index 00000000000..76acf3cc7bf --- /dev/null +++ b/cpp/src/arrow/dataset/partition.h @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace arrow { +namespace dataset { + +// ---------------------------------------------------------------------- +// Computing partition values + +// TODO(wesm): API for computing partition keys derived from raw +// values. For example, year(value) instead of simply value, so a +// dataset with a timestamp column might group all data with year 2009 +// in the same partition + +// /// \brief +// class ScalarTransform { +// public: +// virtual Status Transform(const std::shared_ptr& input, +// std::shared_ptr* output) const = 0; +// }; + +// class PartitionField { +// public: + +// private: +// std::string field_name_; +// }; + +// ---------------------------------------------------------------------- +// Partition identifiers + +/// \brief +class PartitionKey { + public: + +}; + +/// \brief Intermediate data structure for data parsed from a string +/// partition identifier. +/// +/// For example, the identifier "foo=5" might be parsed with a single +/// "foo" field and the value 5. A more complex identifier might be +/// written as "foo=5,bar=2", which would yield two fields and two +/// values +struct PartitionKeyComponents { + std::vector fields; + std::vector> values; +}; + +// ---------------------------------------------------------------------- +// Partition schemes + +/// \brief +class PartitionScheme { + public: + virtual ~PartitionScheme() = default; + + /// \brief The name identifying the kind of partition scheme + virtual std::string name() const = 0; + + virtual bool DirectoryMatches(const std::string& path) const = 0; + + virtual Status ParseDirectory(const std::string& path, + PartitionKeyComponents* out) const = 0; +}; + +/// \brief +class HivePartitionScheme : public PartitionScheme { + public: + + +}; + +// ---------------------------------------------------------------------- +// + +/// \brief Container for a dataset partition, which consists of a +/// partition identifier, subpartitions, and some data fragments +class ARROW_DS_EXPORT SimplePartition : public Partition{ + public: + const PartitionKey& key() const { return *key_; } + + private: + std::unique_ptr key_; + + /// \brief Child partitions of this partition. In some partition + /// schemes, this member is mutually-exclusive with + std::vector> subpartitions_; + + std::vector> data_fragments_; +}; + +/// \brief DataSource implementation for partition-based data sources +class PartitionSource : public DataSource { + public: + std::unique_ptr GetFragments() override; + + protected: + std::vector> partitions_; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h new file mode 100644 index 00000000000..e9f8abd9039 --- /dev/null +++ b/cpp/src/arrow/dataset/scanner.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace arrow { +namespace dataset { + +/// \brief Shared state for a Scan operation +struct ARROW_DS_EXPORT ScanContext {}; + +// TODO(wesm): API for handling of post-materialization filters. For +// example, if the user requests [$col1 > 0, $col2 > 0] and $col1 is a +// partition key, but $col2 is not, then the filter "$col2 > 0" must +// be evaluated in-memory against the RecordBatch objects resulting +// from the Scan + +/// \brief Read record batches from a range of a single data fragment +class ARROW_DS_EXPORT ScanTask { + public: + virtual ~ScanTask() = default; + + /// \brief Iterate through sequence of materialized record batches + /// resulting from the Scan. Execution semantics encapsulated in the + /// particular ScanTask implementation + virtual std::unique_ptr Scan() = 0; +}; + +/// \brief Main interface for +class ARROW_DS_EXPORT Scanner { + public: + virtual ~Scanner() = 0; + + /// \brief Return iterator yielding ScanTask instances to enable + /// serial or parallel execution of units of scanning work + virtual std::unique_ptr GetTasks() = 0; +}; + +class ARROW_DS_EXPORT ScannerBuilder { + public: + ScannerBuilder(std::shared_ptr dataset, + std::shared_ptr scan_context) + + /// \brief Set + ScannerBuilder* Project(const std::vector& columns) const; + + ScannerBuilder* AddFilter(const std::shared_ptr& filter) const; + + /// \brief If true (default), add partition keys to the + /// RecordBatches that the scan produces if they are not in the data + /// otherwise + ScannerBuilder* IncludePartitionKeys(bool include = true) const; + + /// \brief Return the constructed now-immutable Scanner object + std::unique_ptr Finish() const; + + private: + std::shared_ptr dataset_; + std::shared_ptr scan_context_; + std::vector project_columns_; + FilterVector filters_; + bool include_partition_keys_; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/transaction.h b/cpp/src/arrow/dataset/transaction.h index 012c1c11e73..8c83b094254 100644 --- a/cpp/src/arrow/dataset/transaction.h +++ b/cpp/src/arrow/dataset/transaction.h @@ -16,3 +16,12 @@ // under the License. #pragma once + +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace arrow { +namespace dataset { + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index a834f679fe9..94aeb87976c 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -19,8 +19,9 @@ #include -#include "arrow/util/interfaces.h" -#include "arrow/util/visibility.h" +#include "arrow/type_fwd.h" +#include "arrow/util/iterator.h" +#include "arrow/dataset/visibility.h" namespace arrow { @@ -33,19 +34,30 @@ class FileSystem; namespace dataset { class Dataset; -class DatasetFragment; -class DatasetSource; +class DataFragment; +class DataSource; +using DataFragmentIterator = Iterator>; +using DataFragmentVector = std::vector>; + +struct DiscoveryOptions; + +class FileBasedDataFragment; class FileFormat; class FileScanOptions; class FileWriteOptions; + class Filter; +using FilterVector = std::vector>; + class Partition; class PartitionKey; +class PartitionScheme; + +struct ScanContext; class Scanner; class ScannerBuilder; class ScanTask; using ScanTaskIterator = Iterator>; -using RecordBatchIterator = Iterator>; } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/visibility.h b/cpp/src/arrow/dataset/visibility.h new file mode 100644 index 00000000000..324b1b269da --- /dev/null +++ b/cpp/src/arrow/dataset/visibility.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(_WIN32) || defined(__CYGWIN__) +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4251) +#else +#pragma GCC diagnostic ignored "-Wattributes" +#endif + +#ifdef ARROW_DS_STATIC +#define ARROW_DS_EXPORT +#elif defined(ARROW_DS_EXPORTING) +#define ARROW_DS_EXPORT __declspec(dllexport) +#else +#define ARROW_DS_EXPORT __declspec(dllimport) +#endif + +#define ARROW_DS_NO_EXPORT +#else // Not Windows +#ifndef ARROW_DS_EXPORT +#define ARROW_DS_EXPORT __attribute__((visibility("default"))) +#endif +#ifndef ARROW_DS_NO_EXPORT +#define ARROW_DS_NO_EXPORT __attribute__((visibility("hidden"))) +#endif +#endif // Non-Windows + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif From 895a03ee6de74b3d365377078495c1448c740edb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 5 Jun 2019 15:02:51 -0500 Subject: [PATCH 3/7] Checkpoint --- cpp/src/arrow/util/iterator.h | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 cpp/src/arrow/util/iterator.h diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h new file mode 100644 index 00000000000..3ebe2207f7b --- /dev/null +++ b/cpp/src/arrow/util/iterator.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/dataset/visibility.h" + +namespace arrow { + +class Status; + +template +class Iterator { + public: + virtual Status Next(T* out) = 0; +} + +} // namespace arrow From 20b8f4b28c7477c418bb6c7600a2ba58a34fe1ab Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 5 Jun 2019 15:46:37 -0500 Subject: [PATCH 4/7] Compile a simple unit test --- cpp/cmake_modules/DefineOptions.cmake | 2 + cpp/src/arrow/CMakeLists.txt | 4 ++ cpp/src/arrow/dataset/CMakeLists.txt | 30 ++++++++--- cpp/src/arrow/dataset/api.h | 9 ++-- cpp/src/arrow/dataset/dataset.h | 1 + cpp/src/arrow/dataset/discovery.h | 9 ++-- cpp/src/arrow/dataset/disk_store.h | 11 ++-- cpp/src/arrow/dataset/file_base.h | 55 +++++++++++++++---- cpp/src/arrow/dataset/file_csv.h | 13 ++--- cpp/src/arrow/dataset/file_feather.h | 5 +- cpp/src/arrow/dataset/file_json.h | 14 +++-- cpp/src/arrow/dataset/file_parquet.h | 5 +- cpp/src/arrow/dataset/file_test.cc | 77 +++++++++++++++++++++++++++ cpp/src/arrow/dataset/partition.h | 14 ++--- cpp/src/arrow/dataset/scanner.cc | 22 ++++++++ cpp/src/arrow/dataset/scanner.h | 14 +++-- cpp/src/arrow/dataset/transaction.h | 4 +- cpp/src/arrow/dataset/type_fwd.h | 4 +- cpp/src/arrow/record_batch.h | 6 +-- cpp/src/arrow/type_fwd.h | 3 ++ cpp/src/arrow/util/interfaces.h | 2 +- cpp/src/arrow/util/iterator.h | 5 +- 22 files changed, 236 insertions(+), 73 deletions(-) create mode 100644 cpp/src/arrow/dataset/scanner.cc diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index a883b87222d..5d39f616827 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -139,6 +139,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_COMPUTE "Build the Arrow Compute Modules" ON) + define_option(ARROW_DATASET "Build the Arrow Dataset Modules" ON) + define_option(ARROW_FLIGHT "Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)" OFF) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 08f676af87a..c989f855a5b 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -380,6 +380,10 @@ add_subdirectory(io) add_subdirectory(util) add_subdirectory(vendored) +if(ARROW_DATASET) + add_subdirectory(dataset) +endif() + if(ARROW_FLIGHT) add_subdirectory(flight) endif() diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index f521233db5a..90eabb562ea 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -20,16 +20,32 @@ add_custom_target(arrow_dataset) # Headers: top level arrow_install_all_headers("arrow/dataset") -set(ARROW_DATASET_SRCS - - ) +set(ARROW_DATASET_SRCS scanner.cc) add_arrow_lib(arrow_dataset - OUTPUTS ARROW_DATASET_LIBRARIES - SOURCES ${ARROW_DATASET_SRCS} - SHARED_LINK_LIBS arrow_shared - STATIC_LINK_LIBS arrow_static) + OUTPUTS + ARROW_DATASET_LIBRARIES + SOURCES + ${ARROW_DATASET_SRCS} + SHARED_LINK_LIBS + arrow_shared + STATIC_LINK_LIBS + arrow_static) + +if(ARROW_DATASET_TEST_LINKAGE STREQUAL "static") + set(ARROW_DATASET_TEST_LINK_LIBS arrow_dataset_static ${ARROW_TEST_STATIC_LINK_LIBS}) +else() + set(ARROW_DATASET_TEST_LINK_LIBS arrow_dataset_shared ${ARROW_TEST_SHARED_LINK_LIBS}) +endif() foreach(LIB_TARGET ${ARROW_DATASET_LIBRARIES}) target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_DS_EXPORTING) endforeach() + +add_arrow_test(file_test + EXTRA_LINK_LIBS + ${ARROW_DATASET_TEST_LINK_LIBS} + PREFIX + "arrow-dataset" + LABELS + "arrow_dataset") diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index c156ec81bd5..9ded93a7fda 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -17,9 +17,10 @@ #pragma once +#include "arrow/dataset/dataset.h" #include "arrow/dataset/discovery.h" -#include "arrow/dataset/format_base.h" -#include "arrow/dataset/format_csv.h" -#include "arrow/dataset/format_parquet.h" -#include "arrow/dataset/reader.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_csv.h" +#include "arrow/dataset/file_feather.h" +#include "arrow/dataset/file_parquet.h" #include "arrow/dataset/scanner.h" diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index beebd835d6e..5b0f0937da3 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index cbabcda4a4f..18242250f32 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -22,23 +22,24 @@ #pragma once #include +#include #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/macros.h" namespace arrow { namespace dataset { struct ARROW_DS_EXPORT DiscoveryOptions { - std::shared_ptr format = nullptr; - std::shared_ptr partition_scheme = nullptr; + std::shared_ptr format = NULLPTR; + std::shared_ptr partition_scheme = NULLPTR; }; /// \brief Using a root directory ARROW_DS_EXPORT Status DiscoverSource(const std::string& path, fs::FileSystem* filesystem, - const DiscoveryOptions& options, - std::shared_ptr* out); + const DiscoveryOptions& options, std::shared_ptr* out); } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/disk_store.h b/cpp/src/arrow/dataset/disk_store.h index 08c85e98b7f..a405aa2b41a 100644 --- a/cpp/src/arrow/dataset/disk_store.h +++ b/cpp/src/arrow/dataset/disk_store.h @@ -17,14 +17,13 @@ #pragma once -namespace arrow { - -namespace fs { +#include +#include -class FileSystem; - -} // namespace fs +#include "arrow/dataset/type_fwd.h" +#include "arrow/type_fwd.h" +namespace arrow { namespace dataset { /// \brief Loads a previously-written collection of Arrow protocol diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 1ea509edefa..926f54413d4 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -18,10 +18,12 @@ #pragma once #include +#include +#include #include "arrow/dataset/type_fwd.h" -#include "arrow/util/compression.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/compression.h" namespace arrow { namespace dataset { @@ -33,24 +35,58 @@ class ARROW_DS_EXPORT FileSource { FileSource(std::string path, fs::FileSystem* filesystem, Compression::type compression = Compression::UNCOMPRESSED) - : FileSource(FileSource::PATH), - path_(std::move(path)), - filesystem_(filesystem) {} + : FileSource(FileSource::PATH, compression) { + path_ = std::move(path); + filesystem_ = filesystem; + } FileSource(std::shared_ptr buffer, Compression::type compression = Compression::UNCOMPRESSED) - : FileSource(FileSource::BUFFER), buffer_(std::move(buffer)) {} - + : FileSource(FileSource::BUFFER, compression) { + buffer_ = std::move(buffer); + } + + bool operator==(const FileSource& other) const { + if (type_ != other.type_) { + return false; + } else if (type_ == FileSource::PATH) { + return path_ == other.path_ && filesystem_ == other.filesystem_; + } else { + return buffer_->Equals(*other.buffer_); + } + } + + /// \brief The kind of file, whether stored in a filesystem, memory + /// resident, or other SourceType type() const { return type_; } + /// \brief Return the type of raw compression on the file, if any + Compression::type compression() const { return compression_; } + + /// \brief Return the file path, if any. Only valid when file source + /// type is PATH + std::string path() const { return path_; } + + /// \brief Return the filesystem, if any. Only valid when file + /// source type is PATH + fs::FileSystem* filesystem() const { return filesystem_; } + + /// \brief Return the buffer containing the file, if any. Only value + /// when file source type is BUFFER + std::shared_ptr buffer() const { return buffer_; } + private: - FileSource(SourceType type) : type_(type) {} + explicit FileSource(SourceType type, + Compression::type compression = Compression::UNCOMPRESSED) + : type_(type), compression_(compression) {} SourceType type_; + Compression::type compression_; // PATH-based source std::string path_; fs::FileSystem* filesystem_; + // BUFFER-based source std::shared_ptr buffer_; }; @@ -80,7 +116,7 @@ class ARROW_DS_EXPORT FileFormat { virtual std::string name() const = 0; /// \brief Return true if the given file extension - virtual bool IsValidExtension(const std::string& ext) const = 0; + virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning virtual Status ScanFile(const FileSource& location, const FileScanOptions& options, @@ -91,8 +127,7 @@ class ARROW_DS_EXPORT FileFormat { /// \brief A DataFragment that is stored in a file with a known format class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: - FileBasedDataFragment(const FileSource& location, - std::shared_ptr format); + FileBasedDataFragment(const FileSource& location, std::shared_ptr format); const FileSource& location() const { return location_; } std::shared_ptr format() const { return format_; } diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 8cfeecc5395..d1c56b57af0 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -18,12 +18,13 @@ #pragma once #include +#include #include "arrow/csv/options.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" -#include "arrow/util/iterator.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/iterator.h" namespace arrow { @@ -37,7 +38,6 @@ namespace dataset { class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { public: - /// std::string file_type() const override; private: @@ -48,10 +48,7 @@ class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions { public: - virtual ~FileWriteOptions() = default; - - /// - virtual file_type() const = 0; + std::string file_type() const override; }; /// \brief A FileFormat implementation that reads from CSV files @@ -63,8 +60,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const std::string& path, const FileScanOptions& options, - fs::FileSystem* filesystem, + Status ScanFile(const FileSource& location, const FileScanOptions& options, + std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index b6737c326f3..2e9fb5fcc3e 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -46,8 +47,8 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const std::string& path, const FileScanOptions& options, - fs::FileSystem* filesystem, + Status ScanFile(const FileSource& location, const FileScanOptions& options, + std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index 81600070f41..4fa23f112f3 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -18,11 +18,12 @@ #pragma once #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" -#include "arrow/json/options.h" #include "arrow/dataset/visibility.h" +#include "arrow/json/options.h" namespace arrow { namespace dataset { @@ -30,7 +31,7 @@ namespace dataset { class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { public: /// - std::string file_type() const override + std::string file_type() const override; private: json::ParseOptions parse_options_; @@ -39,10 +40,7 @@ class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { public: - virtual ~FileWriteOptions() = default; - - /// - virtual file_type() const = 0; + std::string file_type() const override; }; /// \brief A FileFormat implementation that reads from JSON files @@ -54,8 +52,8 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const std::string& path, const FileScanOptions& options, - fs::FileSystem* filesystem, + Status ScanFile(const FileSource& location, const FileScanOptions& options, + std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index cc77449ef0d..f9e1bc9f363 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -50,8 +51,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const std::string& path, const FileScanOptions& options, - fs::FileSystem* filesystem, + Status ScanFile(const FileSource& location, const FileScanOptions& options, + std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index b248758bc12..2db2213568b 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -14,3 +14,80 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include +#include + +#include "arrow/status.h" +#include "arrow/testing/gtest_util.h" + +#include "arrow/dataset/api.h" +#include "arrow/filesystem/localfs.h" + +namespace arrow { +namespace dataset { + +TEST(FileSource, PathBased) { + fs::LocalFileSystem localfs; + + std::string p1 = "/path/to/file.ext"; + std::string p2 = "/path/to/file.ext.gz"; + + FileSource source1(p1, &localfs); + FileSource source2(p2, &localfs, Compression::GZIP); + + ASSERT_EQ(p1, source1.path()); + ASSERT_EQ(&localfs, source1.filesystem()); + ASSERT_EQ(FileSource::PATH, source1.type()); + ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression()); + + ASSERT_EQ(p2, source2.path()); + ASSERT_EQ(&localfs, source2.filesystem()); + ASSERT_EQ(FileSource::PATH, source2.type()); + ASSERT_EQ(Compression::GZIP, source2.compression()); + + // Test copy constructor and comparison + FileSource source3 = source1; + ASSERT_EQ(source1, source3); +} + +TEST(FileSource, BufferBased) { + std::string the_data = "this is the file contents"; + auto buf = std::make_shared(the_data); + + FileSource source1(buf); + FileSource source2(buf, Compression::LZ4); + + ASSERT_EQ(FileSource::BUFFER, source1.type()); + ASSERT_TRUE(source1.buffer()->Equals(*buf)); + ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression()); + + ASSERT_EQ(FileSource::BUFFER, source2.type()); + ASSERT_TRUE(source2.buffer()->Equals(*buf)); + ASSERT_EQ(Compression::LZ4, source2.compression()); +} + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 76acf3cc7bf..c3267b7b8d3 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -14,8 +14,13 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #pragma once +#include +#include +#include + #include "arrow/dataset/dataset.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -51,7 +56,6 @@ namespace dataset { /// \brief class PartitionKey { public: - }; /// \brief Intermediate data structure for data parsed from a string @@ -70,7 +74,7 @@ struct PartitionKeyComponents { // Partition schemes /// \brief -class PartitionScheme { +class ARROW_DS_EXPORT PartitionScheme { public: virtual ~PartitionScheme() = default; @@ -84,10 +88,8 @@ class PartitionScheme { }; /// \brief -class HivePartitionScheme : public PartitionScheme { +class ARROW_DS_EXPORT HivePartitionScheme : public PartitionScheme { public: - - }; // ---------------------------------------------------------------------- @@ -95,7 +97,7 @@ class HivePartitionScheme : public PartitionScheme { /// \brief Container for a dataset partition, which consists of a /// partition identifier, subpartitions, and some data fragments -class ARROW_DS_EXPORT SimplePartition : public Partition{ +class ARROW_DS_EXPORT SimplePartition : public Partition { public: const PartitionKey& key() const { return *key_; } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc new file mode 100644 index 00000000000..ad802643017 --- /dev/null +++ b/cpp/src/arrow/dataset/scanner.cc @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/scanner.h" + +namespace arrow { +namespace dataset {} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e9f8abd9039..f998c049f8a 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -17,6 +17,10 @@ #pragma once +#include +#include +#include + #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -46,17 +50,17 @@ class ARROW_DS_EXPORT ScanTask { /// \brief Main interface for class ARROW_DS_EXPORT Scanner { public: - virtual ~Scanner() = 0; + virtual ~Scanner() = 0; - /// \brief Return iterator yielding ScanTask instances to enable - /// serial or parallel execution of units of scanning work - virtual std::unique_ptr GetTasks() = 0; + /// \brief Return iterator yielding ScanTask instances to enable + /// serial or parallel execution of units of scanning work + virtual std::unique_ptr GetTasks() = 0; }; class ARROW_DS_EXPORT ScannerBuilder { public: ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context) + std::shared_ptr scan_context); /// \brief Set ScannerBuilder* Project(const std::vector& columns) const; diff --git a/cpp/src/arrow/dataset/transaction.h b/cpp/src/arrow/dataset/transaction.h index 8c83b094254..d5c94b27cf0 100644 --- a/cpp/src/arrow/dataset/transaction.h +++ b/cpp/src/arrow/dataset/transaction.h @@ -21,7 +21,5 @@ #include "arrow/dataset/visibility.h" namespace arrow { -namespace dataset { - -} // namespace dataset +namespace dataset {} // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 94aeb87976c..e03ea147398 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -18,10 +18,10 @@ #pragma once #include +#include -#include "arrow/type_fwd.h" -#include "arrow/util/iterator.h" #include "arrow/dataset/visibility.h" +#include "arrow/type_fwd.h" // IWYU pragma: export namespace arrow { diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index f80d4ed7683..2d0538db4af 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_RECORD_BATCH_H -#define ARROW_RECORD_BATCH_H +#pragma once #include #include @@ -24,6 +23,7 @@ #include #include "arrow/type_fwd.h" +#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -186,5 +186,3 @@ class ARROW_EXPORT RecordBatchReader { }; } // namespace arrow - -#endif // ARROW_RECORD_BATCH_H diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index 918c25e6294..ea32b49d168 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -20,6 +20,7 @@ #include +#include "arrow/util/iterator.h" #include "arrow/util/visibility.h" namespace arrow { @@ -39,6 +40,8 @@ class Column; class RecordBatch; class Table; +using RecordBatchIterator = Iterator>; + class Buffer; class MemoryPool; class RecordBatch; diff --git a/cpp/src/arrow/util/interfaces.h b/cpp/src/arrow/util/interfaces.h index 2e0383fa18e..3ebe2207f7b 100644 --- a/cpp/src/arrow/util/interfaces.h +++ b/cpp/src/arrow/util/interfaces.h @@ -17,7 +17,7 @@ #pragma once -#include "arrow/util/visibility.h" +#include "arrow/dataset/visibility.h" namespace arrow { diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 3ebe2207f7b..52fb10b09a4 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -23,10 +23,13 @@ namespace arrow { class Status; +/// \brief A generic Iterator that can return errors template class Iterator { public: + /// \brief Return the next element of the sequence, nullptr when the + /// iteration is completed virtual Status Next(T* out) = 0; -} +}; } // namespace arrow From ceec07bf9ccea288ff03eff9571b4597b8123caa Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 5 Jun 2019 17:16:13 -0500 Subject: [PATCH 5/7] Finish some initial skeleton prototyping --- cpp/src/arrow/dataset/dataset.h | 50 +++++++++++-- cpp/src/arrow/dataset/file_base.h | 15 ++-- cpp/src/arrow/dataset/file_csv.h | 2 +- cpp/src/arrow/dataset/file_feather.h | 2 +- cpp/src/arrow/dataset/file_json.h | 2 +- cpp/src/arrow/dataset/file_parquet.h | 2 +- cpp/src/arrow/dataset/filter.h | 4 +- cpp/src/arrow/dataset/partition.h | 103 ++++++++++++++++++++++----- cpp/src/arrow/dataset/scanner.h | 15 ++-- cpp/src/arrow/dataset/type_fwd.h | 8 +++ cpp/src/arrow/dataset/writer.h | 36 ++++++++++ 11 files changed, 201 insertions(+), 38 deletions(-) create mode 100644 cpp/src/arrow/dataset/writer.h diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 5b0f0937da3..cc5df49fe16 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -33,21 +33,44 @@ class ARROW_DS_EXPORT DataFragment { /// \brief Return true if the fragment can benefit from parallel /// scanning virtual bool splittable() const = 0; + + /// \brief Partition options to use when scanning this fragment. May be + /// nullptr + virtual std::shared_ptr scan_options() const = 0; +}; + +/// \brief Conditions to apply to a dataset when reading to include or +/// exclude fragments, filter out rows, etc. +struct DataSelector { + std::vector> filters; + + // TODO(wesm): Select specific partition keys, file path globs, or + // other common desirable selections }; /// \brief A basic component of a Dataset which yields zero or more /// DataFragments class ARROW_DS_EXPORT DataSource { public: + enum Type { + SIMPLE, // Flat collection + PARTITIONED, // Partitioned collection + GENERIC // All others + }; + virtual ~DataSource() = default; - virtual std::unique_ptr GetFragments() = 0; + virtual Type type() const = 0; + + virtual std::unique_ptr GetFragments( + const DataSelector& selector) = 0; }; /// \brief A DataSource consisting of a flat sequence of DataFragments class ARROW_DS_EXPORT SimpleDataSource : public DataSource { public: - std::unique_ptr GetFragments() override; + std::unique_ptr GetFragments( + const DataSelector& selector) override; private: DataFragmentVector fragments_; @@ -57,8 +80,15 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { /// from possibly multiple sources class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: - explicit Dataset(const std::shared_ptr& source); - explicit Dataset(const std::vector>& sources); + /// \param[in] source a single input data source + /// \param[in] schema a known schema to conform to, may be nullptr + explicit Dataset(std::shared_ptr source, + std::shared_ptr schema = NULLPTR); + + /// \param[in] source one or more input data sources + /// \param[in] schema a known schema to conform to, may be nullptr + explicit Dataset(const std::vector>& sources, + std::shared_ptr schema = NULLPTR); virtual ~Dataset() = default; @@ -67,7 +97,19 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { const std::vector>& sources() const { return sources_; } + std::shared_ptr schema() const { return schema_; } + + /// \brief Compute consensus schema from input data sources + Status InferSchema(std::shared_ptr* out); + + /// \brief Return a copy of Dataset with a new target schema + Status ReplaceSchema(std::shared_ptr schema, std::unique_ptr* out); + protected: + // The data sources must conform their output to this schema (with + // projections and filters taken into account) + std::shared_ptr schema_; + std::vector> sources_; }; diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 926f54413d4..626a811f190 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -21,8 +21,10 @@ #include #include +#include "arrow/dataset/scanner.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/dataset/writer.h" #include "arrow/util/compression.h" namespace arrow { @@ -91,16 +93,14 @@ class ARROW_DS_EXPORT FileSource { }; /// \brief Base class for file scanning options -class ARROW_DS_EXPORT FileScanOptions { +class ARROW_DS_EXPORT FileScanOptions : public ScanOptions { public: - virtual ~FileScanOptions() = default; - /// \brief The name of the file format this options corresponds to virtual std::string file_type() const = 0; }; /// \brief Base class for file writing options -class ARROW_DS_EXPORT FileWriteOptions { +class ARROW_DS_EXPORT FileWriteOptions : public WriteOptions { public: virtual ~FileWriteOptions() = default; @@ -119,7 +119,8 @@ class ARROW_DS_EXPORT FileFormat { virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const FileSource& location, const FileScanOptions& options, + virtual Status ScanFile(const FileSource& location, + std::shared_ptr scan_options, std::shared_ptr scan_context, std::unique_ptr* out) const = 0; }; @@ -127,7 +128,8 @@ class ARROW_DS_EXPORT FileFormat { /// \brief A DataFragment that is stored in a file with a known format class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: - FileBasedDataFragment(const FileSource& location, std::shared_ptr format); + FileBasedDataFragment(const FileSource& location, std::shared_ptr format, + std::shared_ptr); const FileSource& location() const { return location_; } std::shared_ptr format() const { return format_; } @@ -135,6 +137,7 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { protected: FileSource location_; std::shared_ptr format_; + std::shared_ptr scan_options_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index d1c56b57af0..1b461455756 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -60,7 +60,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& location, const FileScanOptions& options, + Status ScanFile(const FileSource& location, std::shared_ptr scan_options, std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index 2e9fb5fcc3e..d92cf82b9f9 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -47,7 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& location, const FileScanOptions& options, + Status ScanFile(const FileSource& location, std::shared_ptr scan_options, std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index 4fa23f112f3..11e6bbe6ade 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -52,7 +52,7 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& location, const FileScanOptions& options, + Status ScanFile(const FileSource& location, std::shared_ptr scan_options, std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f9e1bc9f363..d88c6f889be 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -51,7 +51,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& location, const FileScanOptions& options, + Status ScanFile(const FileSource& location, std::shared_ptr scan_options, std::shared_ptr scan_context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index 91505f2efd0..a727b1ce4b8 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -28,8 +28,8 @@ class ARROW_DS_EXPORT Filter { public: enum type { /// Simple boolean predicate consisting of comparisons and boolean - /// logic (AND, OR, NOT) - SIMPLE, + /// logic (AND, OR, NOT) involving Schema fields + EXPRESSION, /// GENERIC diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index c3267b7b8d3..628480f4457 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/dataset/dataset.h" @@ -32,9 +33,9 @@ namespace dataset { // Computing partition values // TODO(wesm): API for computing partition keys derived from raw -// values. For example, year(value) instead of simply value, so a -// dataset with a timestamp column might group all data with year 2009 -// in the same partition +// values. For example, year(value) or hash_function(value) instead of +// simply value, so a dataset with a timestamp column might group all +// data with year 2009 in the same partition // /// \brief // class ScalarTransform { @@ -53,9 +54,18 @@ namespace dataset { // ---------------------------------------------------------------------- // Partition identifiers -/// \brief +/// \brief A partition level identifier which can be used +/// +/// TODO(wesm): Is this general enough? What other kinds of partition +/// keys exist and do we need to support them? class PartitionKey { public: + const std::vector& fields() const { return fields_; } + const std::vector>& values() const { return values_; } + + private: + std::vector fields_; + std::vector> values_; }; /// \brief Intermediate data structure for data parsed from a string @@ -64,8 +74,13 @@ class PartitionKey { /// For example, the identifier "foo=5" might be parsed with a single /// "foo" field and the value 5. A more complex identifier might be /// written as "foo=5,bar=2", which would yield two fields and two -/// values -struct PartitionKeyComponents { +/// values. +/// +/// Some partition schemes may store the field names in a metadata +/// store instead of in file paths, for example +/// dataset_root/2009/11/... could be used when the partition fields +/// are "year" and "month" +struct PartitionKeyData { std::vector fields; std::vector> values; }; @@ -81,43 +96,95 @@ class ARROW_DS_EXPORT PartitionScheme { /// \brief The name identifying the kind of partition scheme virtual std::string name() const = 0; - virtual bool DirectoryMatches(const std::string& path) const = 0; + virtual bool PathMatchesScheme(const std::string& path) const = 0; - virtual Status ParseDirectory(const std::string& path, - PartitionKeyComponents* out) const = 0; + virtual Status ParseKey(const std::string& path, PartitionKeyData* out) const = 0; }; -/// \brief +/// \brief Multi-level, directory based partitioning scheme +/// originating from Apache Hive with all data files stored in the +/// leaf directories. Data is partitioned by static values of a +/// particular column in the schema. Partition keys are represented in +/// the form $key=$value in directory names class ARROW_DS_EXPORT HivePartitionScheme : public PartitionScheme { public: + /// \brief Return true if path + bool PathMatchesScheme(const std::string& path) const override; + + virtual Status ParseKey(const std::string& path, PartitionKeyData* out) const = 0; }; // ---------------------------------------------------------------------- // +class ARROW_DS_EXPORT Partition : public DataSource { + public: + DataSource::Type type() const override; + + /// \brief The key for this partition source, may be nullptr, + /// e.g. for the top-level partitioned source container + virtual const PartitionKey* key() const = 0; + + virtual std::unique_ptr GetFragments( + const Selector& selector) = 0; +}; + /// \brief Container for a dataset partition, which consists of a /// partition identifier, subpartitions, and some data fragments class ARROW_DS_EXPORT SimplePartition : public Partition { public: - const PartitionKey& key() const { return *key_; } + SimplePartition(std::unique_ptr partition_key, + DataFragmentVector&& data_fragments, PartitionVector&& subpartitions, + std::shared_ptr scan_options = NULLPTR) + : key_(std::move(partition_key)), + data_fragments_(std::move(data_fragments)), + subpartitions_(std::move(subpartitions)), + scan_options_(scan_options) {} + + const PartitionKey* key() const override { return key_.get(); } + + int num_subpartitions() const { return static_cast(subpartitions_.size()); } + + int num_data_fragments() const { return static_cast(data_fragments__.size()); } + + const PartitionVector& subpartitions() const { return subpartitions_; } + const DataFragmentVector& data_fragments() const { return data_fragments_; } + + std::unique_ptr GetFragments( + const FilterVector& filters) override; private: std::unique_ptr key_; - /// \brief Child partitions of this partition. In some partition - /// schemes, this member is mutually-exclusive with + /// \brief Data fragments belonging to this partition level. In some + /// partition schemes such as Hive-style, this member is + /// mutually-exclusive with subpartitions, where data fragments + /// occur only in the partition leaves + std::vector> data_fragments_; + + /// \brief Child partitions of this partition std::vector> subpartitions_; - std::vector> data_fragments_; + /// \brief Default scan options to use for data fragments + std::shared_ptr scan_options_; }; -/// \brief DataSource implementation for partition-based data sources -class PartitionSource : public DataSource { +/// \brief A PartitionSource that returns fragments as the result of input iterators +class ARROW_DS_EXPORT LazyPartition : public Partition { public: - std::unique_ptr GetFragments() override; + const PartitionKey* key() const override; + + std::unique_ptr GetFragments( + const& DataSelector selector) override; + + // TODO(wesm): Iterate over subpartitions protected: - std::vector> partitions_; + std::unique_ptr partition_iter_; + + // By default, once this source is consumed using GetFragments, it + // cannot be consumed again. By setting this to true, we cache + bool cache_manifest_ = false; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index f998c049f8a..36d3b84cf87 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -36,6 +36,11 @@ struct ARROW_DS_EXPORT ScanContext {}; // be evaluated in-memory against the RecordBatch objects resulting // from the Scan +class ARROW_DS_EXPORT ScanOptions { + public: + virtual ~ScanOptions() = default; +}; + /// \brief Read record batches from a range of a single data fragment class ARROW_DS_EXPORT ScanTask { public: @@ -50,7 +55,7 @@ class ARROW_DS_EXPORT ScanTask { /// \brief Main interface for class ARROW_DS_EXPORT Scanner { public: - virtual ~Scanner() = 0; + virtual ~Scanner() = default; /// \brief Return iterator yielding ScanTask instances to enable /// serial or parallel execution of units of scanning work @@ -63,14 +68,16 @@ class ARROW_DS_EXPORT ScannerBuilder { std::shared_ptr scan_context); /// \brief Set - ScannerBuilder* Project(const std::vector& columns) const; + ScannerBuilder* Project(const std::vector& columns); + + ScannerBuilder* AddFilter(const std::shared_ptr& filter); - ScannerBuilder* AddFilter(const std::shared_ptr& filter) const; + ScannerBuilder* SetGlobalFileOptions(std::shared_ptr options); /// \brief If true (default), add partition keys to the /// RecordBatches that the scan produces if they are not in the data /// otherwise - ScannerBuilder* IncludePartitionKeys(bool include = true) const; + ScannerBuilder* IncludePartitionKeys(bool include = true); /// \brief Return the constructed now-immutable Scanner object std::unique_ptr Finish() const; diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index e03ea147398..8e3824625ed 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -36,6 +36,7 @@ namespace dataset { class Dataset; class DataFragment; class DataSource; +struct DataSelector; using DataFragmentIterator = Iterator>; using DataFragmentVector = std::vector>; @@ -52,12 +53,19 @@ using FilterVector = std::vector>; class Partition; class PartitionKey; class PartitionScheme; +using PartitionVector = std::vector>; +using PartitionIterator = Iterator>; struct ScanContext; +class ScanOptions; class Scanner; class ScannerBuilder; class ScanTask; using ScanTaskIterator = Iterator>; +class DatasetWriter; +class WriteContext; +class WriteOptions; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/writer.h b/cpp/src/arrow/dataset/writer.h new file mode 100644 index 00000000000..048a0e54d75 --- /dev/null +++ b/cpp/src/arrow/dataset/writer.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" + +namespace arrow { +namespace dataset { + +class ARROW_DS_EXPORT WriteOptions { + public: + virtual ~WriteOptions() = default; +}; + +} // namespace dataset +} // namespace arrow From 68712f8709c8be32464d0c488aa819126f2db3f9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 6 Jun 2019 16:50:24 -0500 Subject: [PATCH 6/7] Fix clang warnings, test does not compile on Windows yet --- cpp/src/arrow/dataset/CMakeLists.txt | 16 +++++++++------- cpp/src/arrow/dataset/dataset.h | 4 +++- cpp/src/arrow/dataset/file_base.h | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 90eabb562ea..d8b3ecc0eaf 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -42,10 +42,12 @@ foreach(LIB_TARGET ${ARROW_DATASET_LIBRARIES}) target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_DS_EXPORTING) endforeach() -add_arrow_test(file_test - EXTRA_LINK_LIBS - ${ARROW_DATASET_TEST_LINK_LIBS} - PREFIX - "arrow-dataset" - LABELS - "arrow_dataset") +if(NOT WIN32) + add_arrow_test(file_test + EXTRA_LINK_LIBS + ${ARROW_DATASET_TEST_LINK_LIBS} + PREFIX + "arrow-dataset" + LABELS + "arrow_dataset") +endif() diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index cc5df49fe16..ba49b28466d 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -30,6 +30,8 @@ namespace dataset { /// which can be read/scanned separately from other fragments class ARROW_DS_EXPORT DataFragment { public: + virtual ~DataFragment() = default; + /// \brief Return true if the fragment can benefit from parallel /// scanning virtual bool splittable() const = 0; @@ -85,7 +87,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { explicit Dataset(std::shared_ptr source, std::shared_ptr schema = NULLPTR); - /// \param[in] source one or more input data sources + /// \param[in] sources one or more input data sources /// \param[in] schema a known schema to conform to, may be nullptr explicit Dataset(const std::vector>& sources, std::shared_ptr schema = NULLPTR); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 626a811f190..295a918103c 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -30,7 +30,7 @@ namespace arrow { namespace dataset { -/// \brief +/// \brief Contains the location of a file to be read class ARROW_DS_EXPORT FileSource { public: enum SourceType { PATH, BUFFER }; From 2f6440a2e93f45c020dae9dffc65653244147a65 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 12 Jun 2019 15:21:01 -0500 Subject: [PATCH 7/7] Remove not-currently-needed enum, add comment about an example partition structure --- cpp/src/arrow/dataset/dataset.h | 9 ++------- cpp/src/arrow/dataset/partition.h | 30 ++++++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index ba49b28466d..4aba8945b27 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "arrow/dataset/type_fwd.h" @@ -54,15 +55,9 @@ struct DataSelector { /// DataFragments class ARROW_DS_EXPORT DataSource { public: - enum Type { - SIMPLE, // Flat collection - PARTITIONED, // Partitioned collection - GENERIC // All others - }; - virtual ~DataSource() = default; - virtual Type type() const = 0; + virtual std::string type() const = 0; virtual std::unique_ptr GetFragments( const DataSelector& selector) = 0; diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 628480f4457..28c55adcc10 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -117,9 +117,35 @@ class ARROW_DS_EXPORT HivePartitionScheme : public PartitionScheme { // ---------------------------------------------------------------------- // +// Partitioned datasets come in different forms. Here is an example of +// a Hive-style partitioned dataset: +// +// dataset_root/ +// key1=$k1_v1/ +// key2=$k2_v1/ +// 0.parquet +// 1.parquet +// 2.parquet +// 3.parquet +// key2=$k2_v2/ +// 0.parquet +// 1.parquet +// key1=$k1_v2/ +// key2=$k2_v1/ +// 0.parquet +// 1.parquet +// key2=$k2_v2/ +// 0.parquet +// 1.parquet +// 2.parquet +// +// In this case, the dataset has 11 fragments (11 files) to be +// scanned, or potentially more if it is configured to split Parquet +// files at the row group level + class ARROW_DS_EXPORT Partition : public DataSource { public: - DataSource::Type type() const override; + std::string type() const override; /// \brief The key for this partition source, may be nullptr, /// e.g. for the top-level partitioned source container @@ -129,7 +155,7 @@ class ARROW_DS_EXPORT Partition : public DataSource { const Selector& selector) = 0; }; -/// \brief Container for a dataset partition, which consists of a +/// \brief Simple implementation of Partition, which consists of a /// partition identifier, subpartitions, and some data fragments class ARROW_DS_EXPORT SimplePartition : public Partition { public: