From d2263be8062bd82402a58881da0b8167fba39be4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 24 Apr 2020 10:56:14 -0400 Subject: [PATCH 1/2] ARROW-8318: [C++][Dataset] Construct FileSystemDataset from fragments * Simplified FileSystemDataset to hold a FragmentVector. Each Fragment must be a FileFragment and is checked at `FileSystemDataset::Make`. Fragments are not required to use the same backing filesystem nor the same format. * Removed `FileSystemDataset::format` and `FileSystemDataset::partitions`. * Since FileInfo is not required by neither FileSystemDataset and FileSystemDatasetFactory, it is no possible to create a dataset without any IO involved. * Re-introduced the natural behavior of creating FileFragment with their full partition expressions instead of removing the ancestors common partitions. * Added `Expression::IsSatisfiableWith` method. * Added missing compression cmake options to archery. * Ensure FileSource holds a shared_ptr pointer. This is required to refactor FileSystemDataset to support Buffer FileSource and heterogeneous FileSystems. * Rename `type` to `id`, following other classes. --- cpp/src/arrow/dataset/discovery.cc | 146 ++++++++---------- cpp/src/arrow/dataset/discovery.h | 10 +- cpp/src/arrow/dataset/file_base.cc | 151 +++++-------------- cpp/src/arrow/dataset/file_base.h | 91 ++++------- cpp/src/arrow/dataset/file_csv_test.cc | 2 +- cpp/src/arrow/dataset/file_ipc_test.cc | 55 ++----- cpp/src/arrow/dataset/file_parquet_test.cc | 2 +- cpp/src/arrow/dataset/file_test.cc | 40 ++--- cpp/src/arrow/dataset/filter.h | 4 + cpp/src/arrow/dataset/test_util.h | 21 ++- dev/archery/archery/cli.py | 13 ++ dev/archery/archery/lang/cpp.py | 55 +++++-- python/pyarrow/_dataset.pyx | 32 ++-- python/pyarrow/_fs.pyx | 12 +- python/pyarrow/includes/libarrow_dataset.pxd | 10 +- python/pyarrow/tests/test_dataset.py | 6 +- 16 files changed, 285 insertions(+), 365 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 8820e0ff499..1449716778f 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -103,54 +103,51 @@ Result> UnionDatasetFactory::Finish(FinishOptions optio } FileSystemDatasetFactory::FileSystemDatasetFactory( - std::shared_ptr filesystem, fs::PathForest forest, + std::vector paths, std::shared_ptr filesystem, std::shared_ptr format, FileSystemFactoryOptions options) - : fs_(std::move(filesystem)), - forest_(std::move(forest)), + : paths_(std::move(paths)), + fs_(std::move(filesystem)), format_(std::move(format)), options_(std::move(options)) {} -bool StartsWithAnyOf(const std::vector& prefixes, const std::string& path) { - if (prefixes.empty()) { - return false; - } - - auto basename = fs::internal::GetAbstractPathParent(path).second; - - return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) { - return util::string_view(basename).starts_with(prefix); - }); +util::optional FileSystemDatasetFactory::BaselessPath( + util::string_view path) { + const util::string_view partition_base_dir{options_.partition_base_dir}; + return fs::internal::RemoveAncestor(partition_base_dir, path); } Result> FileSystemDatasetFactory::Make( std::shared_ptr filesystem, const std::vector& paths, std::shared_ptr format, FileSystemFactoryOptions options) { - ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(paths)); - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files))); - - std::unordered_set missing; - DCHECK_OK(forest.Visit([&](fs::PathForest::Ref ref) { - util::string_view parent_path = options.partition_base_dir; - if (auto parent = ref.parent()) { - parent_path = parent.info().path(); + std::vector filtered_paths; + for (const auto& path : paths) { + if (options.exclude_invalid_files) { + ARROW_ASSIGN_OR_RAISE(auto supported, + format->IsSupported(FileSource(path, filesystem))); + if (!supported) { + continue; + } } - for (auto&& path : - fs::internal::AncestorsFromBasePath(parent_path, ref.info().path())) { - ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(std::move(path))); - missing.insert(std::move(file)); - } - return Status::OK(); - })); + filtered_paths.push_back(path); + } - files = std::move(forest).infos(); - files.resize(files.size() + missing.size()); - std::move(missing.begin(), missing.end(), files.end() - missing.size()); + return std::shared_ptr( + new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem), + std::move(format), std::move(options))); +} - ARROW_ASSIGN_OR_RAISE(forest, fs::PathForest::Make(std::move(files))); +bool StartsWithAnyOf(const std::string& path, const std::vector& prefixes) { + if (prefixes.empty()) { + return false; + } - return std::shared_ptr(new FileSystemDatasetFactory( - std::move(filesystem), std::move(forest), std::move(format), std::move(options))); + auto parts = fs::internal::SplitAbstractPath(path); + return std::any_of(parts.cbegin(), parts.cend(), [&](util::string_view part) { + return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) { + return util::string_view(part).starts_with(prefix); + }); + }); } Result> FileSystemDatasetFactory::Make( @@ -164,34 +161,29 @@ Result> FileSystemDatasetFactory::Make( } ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector)); - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files))); - - std::vector filtered_files; - RETURN_NOT_OK(forest.Visit([&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune { - const auto& path = ref.info().path(); + std::vector paths; + for (const auto& info : files) { + const auto& path = info.path(); - if (StartsWithAnyOf(options.selector_ignore_prefixes, path)) { - return fs::PathForest::Prune; + if (!info.IsFile()) { + // TODO(fsaintjacques): push this filtering into Selector logic so we + // don't copy big vector around. + continue; } - if (ref.info().IsFile() && options.exclude_invalid_files) { - ARROW_ASSIGN_OR_RAISE(auto supported, - format->IsSupported(FileSource(path, filesystem.get()))); - if (!supported) { - return fs::PathForest::Continue; - } + if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) { + continue; } - filtered_files.push_back(std::move(forest.infos()[ref.i])); - return fs::PathForest::Continue; - })); + paths.push_back(path); + } - ARROW_ASSIGN_OR_RAISE(forest, - fs::PathForest::MakeFromPreSorted(std::move(filtered_files))); + // Sorting by path guarantees a stability sometimes needed by unit tests. + std::sort(paths.begin(), paths.end()); - return std::shared_ptr(new FileSystemDatasetFactory( - filesystem, std::move(forest), std::move(format), std::move(options))); + return Make(std::move(filesystem), std::move(paths), std::move(format), + std::move(options)); } Result> FileSystemDatasetFactory::PartitionSchema() { @@ -199,15 +191,14 @@ Result> FileSystemDatasetFactory::PartitionSchema() { return partitioning->schema(); } - std::vector paths; - for (const auto& info : forest_.infos()) { - if (auto relative = - fs::internal::RemoveAncestor(options_.partition_base_dir, info.path())) { - paths.push_back(*relative); + std::vector relative_paths; + for (const auto& path : paths_) { + if (auto relative = BaselessPath(path)) { + relative_paths.push_back(*relative); } } - return options_.partitioning.factory()->Inspect(paths); + return options_.partitioning.factory()->Inspect(relative_paths); } Result>> FileSystemDatasetFactory::InspectSchemas( @@ -216,11 +207,9 @@ Result>> FileSystemDatasetFactory::InspectSc const bool has_fragments_limit = options.fragments >= 0; int fragments = options.fragments; - for (const auto& f : forest_.infos()) { - if (!f.IsFile()) continue; + for (const auto& path : paths_) { if (has_fragments_limit && fragments-- == 0) break; - FileSource src(f.path(), fs_.get()); - ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src)); + ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_})); schemas.push_back(schema); } @@ -246,32 +235,25 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions } } - ExpressionVector partitions(forest_.size(), scalar(true)); std::shared_ptr partitioning = options_.partitioning.partitioning(); if (partitioning == nullptr) { auto factory = options_.partitioning.factory(); ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema)); } - // apply partitioning to forest to derive partitions - auto apply_partitioning = [&](fs::PathForest::Ref ref) { - if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir, - ref.info().path())) { - auto segments = fs::internal::SplitAbstractPath(relative->to_string()); - - if (segments.size() > 0) { - auto segment_index = static_cast(segments.size()) - 1; - auto maybe_partition = partitioning->Parse(segments.back(), segment_index); - - partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true)); - } + FragmentVector fragments; + for (const auto& path : paths_) { + std::shared_ptr partition = scalar(true); + if (auto relative = BaselessPath(path)) { + std::string path_string{*relative}; + partition = partitioning->Parse(path_string).ValueOr(scalar(true)); } - return Status::OK(); - }; - RETURN_NOT_OK(forest_.Visit(apply_partitioning)); - return FileSystemDataset::Make(schema, root_partition_, format_, fs_, forest_, - std::move(partitions)); + ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition)); + fragments.push_back(fragment); + } + + return FileSystemDataset::Make(schema, root_partition_, format_, fragments); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index ac009231eec..5030b7ee540 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -220,16 +220,20 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { Result> Finish(FinishOptions options) override; protected: - FileSystemDatasetFactory(std::shared_ptr filesystem, - fs::PathForest forest, std::shared_ptr format, + FileSystemDatasetFactory(std::vector paths, + std::shared_ptr filesystem, + std::shared_ptr format, FileSystemFactoryOptions options); Result> PartitionSchema(); + std::vector paths_; std::shared_ptr fs_; - fs::PathForest forest_; std::shared_ptr format_; FileSystemFactoryOptions options_; + + private: + util::optional BaselessPath(util::string_view path); }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index a744bb4eb88..02ad596cecd 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -35,7 +35,7 @@ namespace arrow { namespace dataset { Result> FileSource::Open() const { - if (type() == PATH) { + if (id() == PATH) { return filesystem()->OpenInputFile(path()); } @@ -47,7 +47,7 @@ Result> FileSource::OpenWritable() cons return Status::Invalid("file source '", path(), "' is not writable"); } - if (type() == PATH) { + if (id() == PATH) { return filesystem()->OpenOutputStream(path()); } @@ -84,61 +84,38 @@ Result FileFragment::Scan(std::shared_ptr options FileSystemDataset::FileSystemDataset(std::shared_ptr schema, std::shared_ptr root_partition, std::shared_ptr format, - std::shared_ptr filesystem, - fs::PathForest forest, - ExpressionVector file_partitions) + std::vector> fragments) : Dataset(std::move(schema), std::move(root_partition)), format_(std::move(format)), - filesystem_(std::move(filesystem)), - forest_(std::move(forest)), - partitions_(std::move(file_partitions)) { - DCHECK_EQ(static_cast(forest_.size()), partitions_.size()); -} - -Result> FileSystemDataset::Make( - std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - std::vector infos) { - ExpressionVector partitions(infos.size(), scalar(true)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(infos), std::move(partitions)); -} + fragments_(std::move(fragments)) {} Result> FileSystemDataset::Make( std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - std::vector infos, ExpressionVector partitions) { - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(infos), &partitions)); - return Make(std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions)); -} + std::shared_ptr format, FragmentVector fragments) { + std::vector> file_fragments; + for (const auto& fragment : fragments) { + auto file_fragment = internal::checked_pointer_cast(fragment); + file_fragments.push_back(std::move(file_fragment)); + } -Result> FileSystemDataset::Make( - std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - fs::PathForest forest, ExpressionVector partitions) { - return std::shared_ptr(new FileSystemDataset( - std::move(schema), std::move(root_partition), std::move(format), - std::move(filesystem), std::move(forest), std::move(partitions))); + return std::shared_ptr( + new FileSystemDataset(std::move(schema), std::move(root_partition), + std::move(format), std::move(file_fragments))); } Result> FileSystemDataset::ReplaceSchema( std::shared_ptr schema) const { RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); - return std::shared_ptr( - new FileSystemDataset(std::move(schema), partition_expression_, format_, - filesystem_, forest_, partitions_)); + return std::shared_ptr(new FileSystemDataset( + std::move(schema), partition_expression_, format_, fragments_)); } std::vector FileSystemDataset::files() const { std::vector files; - DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) { - if (ref.info().IsFile()) { - files.push_back(ref.info().path()); - } - return Status::OK(); - })); + for (const auto& fragment : fragments_) { + files.push_back(fragment->source().path()); + } return files; } @@ -146,68 +123,30 @@ std::vector FileSystemDataset::files() const { std::string FileSystemDataset::ToString() const { std::string repr = "FileSystemDataset:"; - if (forest_.size() == 0) { + if (fragments_.empty()) { return repr + " []"; } - DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) { - repr += "\n" + ref.info().path(); + for (const auto& fragment : fragments_) { + repr += "\n" + fragment->source().path(); - if (!partitions_[ref.i]->Equals(true)) { - repr += ": " + partitions_[ref.i]->ToString(); + const auto& partition = fragment->partition_expression(); + if (!partition->Equals(true)) { + repr += ": " + partition->ToString(); } - - return Status::OK(); - })); + } return repr; } -std::shared_ptr FoldingAnd(const std::shared_ptr& l, - const std::shared_ptr& r) { - if (l->Equals(true)) return r; - if (r->Equals(true)) return l; - return and_(l, r); -} - FragmentIterator FileSystemDataset::GetFragmentsImpl( std::shared_ptr predicate) { FragmentVector fragments; - ExpressionVector fragment_partitions(forest_.size()); - - auto collect_fragments = [&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune { - auto partition = partitions_[ref.i]; - - // if available, copy parent's filter and projector - // (which are appropriately simplified and loaded with default values) - if (auto parent = ref.parent()) { - fragment_partitions[ref.i] = FoldingAnd(fragment_partitions[parent.i], partition); - } else { - fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partition); + for (const auto& fragment : fragments_) { + if (predicate->IsSatisfiableWith(fragment->partition_expression())) { + fragments.push_back(fragment); } - - auto simplified_predicate = predicate->Assume(partition); - if (!simplified_predicate->IsSatisfiable()) { - // directories (and descendants) which can't satisfy the filter are pruned - return fs::PathForest::Prune; - } - - if (ref.info().IsFile()) { - // generate a fragment for this file - FileSource src(ref.info().path(), filesystem_.get()); - ARROW_ASSIGN_OR_RAISE( - auto fragment, - format_->MakeFragment(std::move(src), std::move(fragment_partitions[ref.i]))); - fragments.push_back(std::move(fragment)); - } - - return fs::PathForest::Continue; - }; - - auto status = forest_.Visit(collect_fragments); - if (!status.ok()) { - return MakeErrorIterator>(status); } return MakeVectorIterator(std::move(fragments)); @@ -221,42 +160,34 @@ Result> FileSystemDataset::Write( filesystem = std::make_shared(); } - std::vector files(plan.paths.size()); - ExpressionVector partition_expressions(plan.paths.size(), scalar(true)); auto task_group = scan_context->TaskGroup(); - auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir); auto extension = "." + plan.format->type_name(); + FragmentVector fragments; for (size_t i = 0; i < plan.paths.size(); ++i) { const auto& op = plan.fragment_or_partition_expressions[i]; - if (util::holds_alternative>(op)) { - files[i].set_type(fs::FileType::Directory); - files[i].set_path(partition_base_dir + plan.paths[i]); + if (util::holds_alternative>(op)) { + auto path = partition_base_dir + plan.paths[i] + extension; - partition_expressions[i] = util::get>(op); - } else { - files[i].set_type(fs::FileType::File); - files[i].set_path(partition_base_dir + plan.paths[i] + extension); + const auto& input_fragment = util::get>(op); + FileSource dest(path, filesystem); - const auto& fragment = util::get>(op); - - FileSource dest(files[i].path(), filesystem.get()); - ARROW_ASSIGN_OR_RAISE(auto write_task, - plan.format->WriteFragment(std::move(dest), fragment, - scan_options, scan_context)); + ARROW_ASSIGN_OR_RAISE( + auto fragment, + plan.format->MakeFragment(dest, input_fragment->partition_expression())); + fragments.push_back(std::move(fragment)); + ARROW_ASSIGN_OR_RAISE( + auto write_task, + plan.format->WriteFragment(dest, input_fragment, scan_options, scan_context)); task_group->Append([write_task] { return write_task->Execute(); }); } } RETURN_NOT_OK(task_group->Finish()); - ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::MakeFromPreSorted(std::move(files))); - - auto partition_expression = scalar(true); - return Make(plan.schema, partition_expression, plan.format, std::move(filesystem), - std::move(forest), std::move(partition_expressions)); + return Make(plan.schema, scalar(true), plan.format, fragments); } Status WriteTask::CreateDestinationParentDir() const { diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 5a4cd450959..f3243e09aa2 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -44,12 +44,12 @@ namespace dataset { class ARROW_DS_EXPORT FileSource { public: // NOTE(kszucs): it'd be better to separate the BufferSource from FileSource - enum DatasetType { PATH, BUFFER }; + enum SourceKind { PATH, BUFFER }; - FileSource(std::string path, fs::FileSystem* filesystem, + FileSource(std::string path, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED, bool writable = true) - : impl_(PathAndFileSystem{std::move(path), filesystem}), + : impl_(PathAndFileSystem{std::move(path), std::move(filesystem)}), compression_(compression), writable_(writable) {} @@ -62,11 +62,11 @@ class ARROW_DS_EXPORT FileSource { : impl_(std::move(buffer)), compression_(compression), writable_(true) {} bool operator==(const FileSource& other) const { - if (type() != other.type()) { + if (id() != other.id()) { return false; } - if (type() == PATH) { + if (id() == PATH) { return path() == other.path() && filesystem() == other.filesystem(); } @@ -75,7 +75,7 @@ class ARROW_DS_EXPORT FileSource { /// \brief The kind of file, whether stored in a filesystem, memory /// resident, or other - DatasetType type() const { return static_cast(impl_.index()); } + SourceKind id() const { return static_cast(impl_.index()); } /// \brief Return the type of raw compression on the file, if any Compression::type compression() const { return compression_; } @@ -87,20 +87,21 @@ class ARROW_DS_EXPORT FileSource { /// type is PATH const std::string& path() const { static std::string buffer_path = ""; - return type() == PATH ? util::get(impl_).path : buffer_path; + return id() == PATH ? util::get(impl_).path : buffer_path; } /// \brief Return the filesystem, if any. Only non null when file /// source type is PATH - fs::FileSystem* filesystem() const { - return type() == PATH ? util::get(impl_).filesystem : NULLPTR; + const std::shared_ptr& filesystem() const { + static std::shared_ptr no_fs = NULLPTR; + return id() == PATH ? util::get(impl_).filesystem : no_fs; } /// \brief Return the buffer containing the file, if any. Only value /// when file source type is BUFFER const std::shared_ptr& buffer() const { static std::shared_ptr path_buffer = NULLPTR; - return type() == BUFFER ? util::get(impl_) : path_buffer; + return id() == BUFFER ? util::get(impl_) : path_buffer; } /// \brief Get a RandomAccessFile which views this file source @@ -112,7 +113,7 @@ class ARROW_DS_EXPORT FileSource { private: struct PathAndFileSystem { std::string path; - fs::FileSystem* filesystem; + std::shared_ptr filesystem; }; util::variant> impl_; @@ -184,57 +185,24 @@ class ARROW_DS_EXPORT FileFragment : public Fragment { }; /// \brief A Dataset of FileFragments. +/// +/// A FileSystemDataset is composed of one or more FileFragment. The fragments +/// are independent and don't need to share the same format and/or filesystem. class ARROW_DS_EXPORT FileSystemDataset : public Dataset { public: /// \brief Create a FileSystemDataset. /// - /// \param[in] schema the top-level schema of the DataDataset - /// \param[in] root_partition the top-level partition of the DataDataset - /// \param[in] format file format to create fragments from. - /// \param[in] filesystem the filesystem which files are from. - /// \param[in] infos a list of files/directories to consume. - /// attach additional partition expressions to FileInfo found in `infos`. + /// \param[in] schema the schema of the dataset + /// \param[in] root_partition the partition expression of the dataset + /// \param[in] fragments list of fragments to create the dataset from /// - /// The caller is not required to provide a complete coverage of nodes and - /// partitions. - static Result> Make( - std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - std::vector infos); - - /// \brief Create a FileSystemDataset with file-level partitions. - /// - /// \param[in] schema the top-level schema of the DataDataset - /// \param[in] root_partition the top-level partition of the DataDataset - /// \param[in] format file format to create fragments from. - /// \param[in] filesystem the filesystem which files are from. - /// \param[in] infos a list of files/directories to consume. - /// \param[in] partitions partition information associated with `infos`. - /// attach additional partition expressions to FileInfo found in `infos`. - /// - /// The caller is not required to provide a complete coverage of nodes and - /// partitions. - static Result> Make( - std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - std::vector infos, ExpressionVector partitions); - - /// \brief Create a FileSystemDataset with file-level partitions. - /// - /// \param[in] schema the top-level schema of the DataDataset - /// \param[in] root_partition the top-level partition of the DataDataset - /// \param[in] format file format to create fragments from. - /// \param[in] filesystem the filesystem which files are from. - /// \param[in] forest a PathForest of files/directories to consume. - /// \param[in] partitions partition information associated with `forest`. - /// attach additional partition expressions to FileInfo found in `forest`. + /// Note that all fragment must be of `FileFragment` type. The type are + /// erased to simplify callers. /// - /// The caller is not required to provide a complete coverage of nodes and - /// partitions. + /// \return A constructed dataset. static Result> Make( std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, std::shared_ptr filesystem, - fs::PathForest forest, ExpressionVector partitions); + std::shared_ptr format, FragmentVector fragments); /// \brief Write to a new format and filesystem location, preserving partitioning. /// @@ -245,16 +213,18 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { const WritePlan& plan, std::shared_ptr scan_options, std::shared_ptr scan_context); + /// \brief Return the type name of the dataset. std::string type_name() const override { return "filesystem"; } + /// \brief Replace the schema of the dataset. Result> ReplaceSchema( std::shared_ptr schema) const override; - const std::shared_ptr& format() const { return format_; } - + /// \brief Return the path of files. std::vector files() const; - const ExpressionVector& partitions() const { return partitions_; } + /// \brief Return the format. + const std::shared_ptr& format() const { return format_; } std::string ToString() const; @@ -264,13 +234,10 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { FileSystemDataset(std::shared_ptr schema, std::shared_ptr root_partition, std::shared_ptr format, - std::shared_ptr filesystem, fs::PathForest forest, - ExpressionVector file_partitions); + std::vector> fragments); std::shared_ptr format_; - std::shared_ptr filesystem_; - fs::PathForest forest_; - ExpressionVector partitions_; + std::vector> fragments_; }; /// \brief Write a fragment to a single OutputStream. diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 0cd2d907fd6..9c54eb40309 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -92,7 +92,7 @@ TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) { ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name), - format_->Inspect({file_name, fs.get()}).status()); + format_->Inspect({file_name, fs}).status()); } TEST_F(TestCsvFileFormat, Inspect) { diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 1247eb109e7..652d6717742 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -148,40 +148,22 @@ class TestIpcFileSystemDataset : public TestIpcFileFormat, public MakeFileSystemDatasetMixin {}; TEST_F(TestIpcFileSystemDataset, Write) { - MakeDatasetFromPathlist(R"( - old_root/i32=0/ - old_root/i32=0/str=aaa/ + std::string paths = R"( old_root/i32=0/str=aaa/dat - old_root/i32=0/str=bbb/ old_root/i32=0/str=bbb/dat - old_root/i32=0/str=ccc/ old_root/i32=0/str=ccc/dat - - old_root/i32=1/ - old_root/i32=1/str=aaa/ old_root/i32=1/str=aaa/dat - old_root/i32=1/str=bbb/ old_root/i32=1/str=bbb/dat - old_root/i32=1/str=ccc/ old_root/i32=1/str=ccc/dat - )", - scalar(true), - { - ("i32"_ == 0).Copy(), - ("str"_ == "aaa").Copy(), - scalar(true), - ("str"_ == "bbb").Copy(), - scalar(true), - ("str"_ == "ccc").Copy(), - scalar(true), - ("i32"_ == 1).Copy(), - ("str"_ == "aaa").Copy(), - scalar(true), - ("str"_ == "bbb").Copy(), - scalar(true), - ("str"_ == "ccc").Copy(), - scalar(true), - }); + )"; + + ExpressionVector partitions{ + ("i32"_ == 0 and "str"_ == "aaa").Copy(), ("i32"_ == 0 and "str"_ == "bbb").Copy(), + ("i32"_ == 0 and "str"_ == "ccc").Copy(), ("i32"_ == 1 and "str"_ == "aaa").Copy(), + ("i32"_ == 1 and "str"_ == "bbb").Copy(), ("i32"_ == 1 and "str"_ == "ccc").Copy(), + }; + + MakeDatasetFromPathlist(paths, scalar(true), partitions); auto schema = arrow::schema({field("i32", int32()), field("str", utf8())}); opts_ = ScanOptions::Make(schema); @@ -196,21 +178,6 @@ TEST_F(TestIpcFileSystemDataset, Write) { ASSERT_OK_AND_ASSIGN(auto written, FileSystemDataset::Write(plan, opts_, ctx_)); - using E = TestExpression; - std::vector actual_partitions; - for (const auto& partition : written->partitions()) { - actual_partitions.emplace_back(partition); - } - EXPECT_THAT(actual_partitions, - testing::ElementsAre(E{"str"_ == "aaa"}, E{"i32"_ == 0}, E{scalar(true)}, - E{"i32"_ == 1}, E{scalar(true)}, - - E{"str"_ == "bbb"}, E{"i32"_ == 0}, E{scalar(true)}, - E{"i32"_ == 1}, E{scalar(true)}, - - E{"str"_ == "ccc"}, E{"i32"_ == 0}, E{scalar(true)}, - E{"i32"_ == 1}, E{scalar(true)})); - auto parent_directories = written->files(); for (auto& path : parent_directories) { EXPECT_EQ(fs::internal::GetAbstractPathExtension(path), "ipc"); @@ -231,7 +198,7 @@ TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) { constexpr auto file_name = "herp/derp"; ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); - result = format_->Inspect({file_name, fs.get()}); + result = format_->Inspect({file_name, fs}); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name), result.status()); } diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index ee65e50382c..70b0f02fcf0 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -260,7 +260,7 @@ TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { constexpr auto file_name = "herp/derp"; ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); - result = format_->Inspect({file_name, fs.get()}); + result = format_->Inspect({file_name, fs}); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(file_name), result.status()); } diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index ee2bdc5cee2..8651c2ddd00 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -39,22 +39,22 @@ using fs::internal::GetAbstractPathExtension; using internal::TemporaryDir; TEST(FileSource, PathBased) { - fs::LocalFileSystem localfs; + auto localfs = std::make_shared(); std::string p1 = "/path/to/file.ext"; std::string p2 = "/path/to/file.ext.gz"; - FileSource source1(p1, &localfs); - FileSource source2(p2, &localfs, Compression::GZIP); + FileSource source1(p1, localfs); + FileSource source2(p2, localfs, Compression::GZIP); ASSERT_EQ(p1, source1.path()); - ASSERT_EQ(&localfs, source1.filesystem()); - ASSERT_EQ(FileSource::PATH, source1.type()); + ASSERT_TRUE(localfs->Equals(*source1.filesystem())); + ASSERT_EQ(FileSource::PATH, source1.id()); ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression()); ASSERT_EQ(p2, source2.path()); - ASSERT_EQ(&localfs, source2.filesystem()); - ASSERT_EQ(FileSource::PATH, source2.type()); + ASSERT_TRUE(localfs->Equals(*source2.filesystem())); + ASSERT_EQ(FileSource::PATH, source2.id()); ASSERT_EQ(Compression::GZIP, source2.compression()); // Test copy constructor and comparison @@ -69,11 +69,11 @@ TEST(FileSource, BufferBased) { FileSource source1(buf); FileSource source2(buf, Compression::LZ4); - ASSERT_EQ(FileSource::BUFFER, source1.type()); + ASSERT_EQ(FileSource::BUFFER, source1.id()); ASSERT_TRUE(source1.buffer()->Equals(*buf)); ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression()); - ASSERT_EQ(FileSource::BUFFER, source2.type()); + ASSERT_EQ(FileSource::BUFFER, source2.id()); ASSERT_TRUE(source2.buffer()->Equals(*buf)); ASSERT_EQ(Compression::LZ4, source2.compression()); } @@ -96,7 +96,7 @@ TEST_F(TestFileSystemDataset, ReplaceSchema) { auto schm = schema({field("i32", int32()), field("f64", float64())}); auto format = std::make_shared(schm); ASSERT_OK_AND_ASSIGN(auto dataset, - FileSystemDataset::Make(schm, scalar(true), format, fs_, {})); + FileSystemDataset::Make(schm, scalar(true), format, {})); // drop field ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status()); @@ -147,9 +147,12 @@ TEST_F(TestFileSystemDataset, TreePartitionPruning) { }; ExpressionVector partitions = { - ("state"_ == "NY").Copy(), ("city"_ == "New York").Copy(), - ("city"_ == "Franklin").Copy(), ("state"_ == "CA").Copy(), - ("city"_ == "San Francisco").Copy(), ("city"_ == "Franklin").Copy(), + ("state"_ == "NY").Copy(), + ("state"_ == "NY" and "city"_ == "New York").Copy(), + ("state"_ == "NY" and "city"_ == "Franklin").Copy(), + ("state"_ == "CA").Copy(), + ("state"_ == "CA" and "city"_ == "San Francisco").Copy(), + ("state"_ == "CA" and "city"_ == "Franklin").Copy(), }; MakeDataset(regions, root_partition, partitions); @@ -184,15 +187,18 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { }; ExpressionVector partitions = { - ("state"_ == "NY").Copy(), ("city"_ == "New York").Copy(), - ("city"_ == "Franklin").Copy(), ("state"_ == "CA").Copy(), - ("city"_ == "San Francisco").Copy(), ("city"_ == "Franklin").Copy(), + ("state"_ == "NY").Copy(), + ("state"_ == "NY" and "city"_ == "New York").Copy(), + ("state"_ == "NY" and "city"_ == "Franklin").Copy(), + ("state"_ == "CA").Copy(), + ("state"_ == "CA" and "city"_ == "San Francisco").Copy(), + ("state"_ == "CA" and "city"_ == "Franklin").Copy(), }; MakeDataset(regions, root_partition, partitions); auto with_root = [&](const Expression& state, const Expression& city) { - return and_(and_(root_partition, state.Copy()), city.Copy()); + return and_(state.Copy(), city.Copy()); }; AssertFragmentsHavePartitionExpressions( diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index 26f37b008fe..d8ef4f7f890 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -188,6 +188,10 @@ class ARROW_DS_EXPORT Expression { /// This is a shortcut to check if the expression is neither null nor false. bool IsSatisfiable() const { return !IsNull() && !Equals(false); } + bool IsSatisfiableWith(const std::shared_ptr other) const { + return Assume(other)->IsSatisfiable(); + } + /// returns a debug string representing this expression virtual std::string ToString() const = 0; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 8ca055bf3b4..26b43ef7816 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -292,15 +292,28 @@ struct MakeFileSystemDatasetMixin { void MakeDataset(const std::vector& infos, std::shared_ptr root_partition = scalar(true), ExpressionVector partitions = {}) { + auto n_fragments = infos.size(); if (partitions.empty()) { - partitions.resize(infos.size(), scalar(true)); + partitions.resize(n_fragments, scalar(true)); } MakeFileSystem(infos); auto format = std::make_shared(); - ASSERT_OK_AND_ASSIGN( - dataset_, FileSystemDataset::Make(schema({}), root_partition, format, fs_, infos, - partitions)); + + FragmentVector fragments; + for (size_t i = 0; i < n_fragments; i++) { + const auto& info = infos[i]; + if (!info.IsFile()) { + continue; + } + + ASSERT_OK_AND_ASSIGN(auto fragment, + format->MakeFragment({info.path(), fs_}, partitions[i])); + fragments.push_back(std::move(fragment)); + } + + ASSERT_OK_AND_ASSIGN(dataset_, FileSystemDataset::Make(schema({}), root_partition, + format, std::move(fragments))); } void MakeDatasetFromPathlist(const std::string& pathlist, diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py index 68c2742f868..1cfd11a53a4 100644 --- a/dev/archery/archery/cli.py +++ b/dev/archery/archery/cli.py @@ -192,6 +192,19 @@ def _apply_options(cmd, options): "it will toggle required options") @click.option("--with-s3", default=None, type=BOOL, help="Build Arrow with S3 support.") +# Compressions +@click.option("--with-brotli", default=None, type=BOOL, + help="Build Arrow with brotli compression.") +@click.option("--with-bz2", default=None, type=BOOL, + help="Build Arrow with bz2 compression.") +@click.option("--with-lz4", default=None, type=BOOL, + help="Build Arrow with lz4 compression.") +@click.option("--with-snappy", default=None, type=BOOL, + help="Build Arrow with snappy compression.") +@click.option("--with-zlib", default=None, type=BOOL, + help="Build Arrow with zlib compression.") +@click.option("--with-zstd", default=None, type=BOOL, + help="Build Arrow with zstd compression.") # CMake extra feature @click.option("--cmake-extras", type=str, multiple=True, help="Extra flags/options to pass to cmake invocation. " diff --git a/dev/archery/archery/lang/cpp.py b/dev/archery/archery/lang/cpp.py index 4bc3f107c49..a9997ad70a4 100644 --- a/dev/archery/archery/lang/cpp.py +++ b/dev/archery/archery/lang/cpp.py @@ -28,12 +28,16 @@ def or_else(value, default): return value if value else default +def coalesce(value, fallback): + return fallback if value is None else value + + LLVM_VERSION = 7 class CppConfiguration: - def __init__(self, + # toolchain cc=None, cxx=None, cxx_flags=None, build_type=None, warn_level=None, @@ -52,6 +56,9 @@ def __init__(self, with_mimalloc=None, with_parquet=None, with_plasma=None, with_python=True, with_r=None, with_s3=None, + # Compressions + with_brotli=None, with_bz2=None, with_lz4=None, + with_snappy=None, with_zlib=None, with_zstd=None, # extras with_lint_only=False, use_gold_linker=True, @@ -95,23 +102,38 @@ def __init__(self, self.with_r = with_r self.with_s3 = with_s3 + self.with_brotli = with_brotli + self.with_bz2 = with_bz2 + self.with_lz4 = with_lz4 + self.with_snappy = with_snappy + self.with_zlib = with_zlib + self.with_zstd = with_zstd + self.with_lint_only = with_lint_only self.use_gold_linker = use_gold_linker self.cmake_extras = cmake_extras - # Fixup required dependencies + # Fixup required dependencies by providing sane defaults if the caller + # didn't specify the option. if self.with_r: - self.with_csv = True - self.with_dataset = True - self.with_filesystem = True - self.with_ipc = True - self.with_json = True - self.with_parquet = True + self.with_csv = coalesce(with_csv, True) + self.with_dataset = coalesce(with_dataset, True) + self.with_filesystem = coalesce(with_filesystem, True) + self.with_ipc = coalesce(with_ipc, True) + self.with_json = coalesce(with_json, True) + self.with_parquet = coalesce(with_parquet, True) + + if self.with_python: + self.with_zlib = coalesce(with_zlib, True) + self.with_lz4 = coalesce(with_lz4, True) if self.with_dataset: - self.with_filesystem = True - self.with_parquet = True + self.with_filesystem = coalesce(with_filesystem, True) + self.with_parquet = coalesce(with_parquet, True) + + if self.with_parquet: + self.with_snappy = coalesce(with_snappy, True) @property def build_type(self): @@ -185,10 +207,7 @@ def _gen_defs(self): yield ("ARROW_FILESYSTEM", truthifier(self.with_filesystem)) yield ("ARROW_FLIGHT", truthifier(self.with_flight)) yield ("ARROW_GANDIVA", truthifier(self.with_gandiva)) - if self.with_parquet: - yield ("ARROW_PARQUET", truthifier(self.with_parquet)) - yield ("ARROW_WITH_BROTLI", "ON") - yield ("ARROW_WITH_SNAPPY", "ON") + yield ("ARROW_PARQUET", truthifier(self.with_parquet)) yield ("ARROW_HDFS", truthifier(self.with_hdfs)) yield ("ARROW_HIVESERVER2", truthifier(self.with_hiveserver2)) yield ("ARROW_IPC", truthifier(self.with_ipc)) @@ -199,6 +218,14 @@ def _gen_defs(self): yield ("ARROW_PYTHON", truthifier(self.with_python)) yield ("ARROW_S3", truthifier(self.with_s3)) + # Compressions + yield ("ARROW_WITH_BROTLI", truthifier(self.with_brotli)) + yield ("ARROW_WITH_BZ2", truthifier(self.with_bz2)) + yield ("ARROW_WITH_LZ4", truthifier(self.with_lz4)) + yield ("ARROW_WITH_SNAPPY", truthifier(self.with_snappy)) + yield ("ARROW_WITH_ZLIB", truthifier(self.with_zlib)) + yield ("ARROW_WITH_ZSTD", truthifier(self.with_zstd)) + yield ("ARROW_LINT_ONLY", truthifier(self.with_lint_only)) # Some configurations don't like gnu gold linker. diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 23b81a00eaf..182a3a49d0c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -275,8 +275,10 @@ cdef class FileSystemDataset(Dataset): cdef: FileInfo info Expression expr + Fragment fragment vector[CFileInfo] c_file_infos vector[shared_ptr[CExpression]] c_partitions + vector[shared_ptr[CFragment]] c_fragments CResult[shared_ptr[CDataset]] result # validate required arguments @@ -291,21 +293,23 @@ cdef class FileSystemDataset(Dataset): "got {2})".format(name, class_.__name__, type(arg)) ) - for info in filesystem.get_file_info(paths_or_selector): - c_file_infos.push_back(info.unwrap()) + infos = filesystem.get_file_info(paths_or_selector) if partitions is None: - partitions = [ - ScalarExpression(True) for _ in range(c_file_infos.size())] - for expr in partitions: - c_partitions.push_back(expr.unwrap()) + partitions = [ScalarExpression(True) for _ in range(len(infos))] - if c_file_infos.size() != c_partitions.size(): + if len(infos) != len(partitions): raise ValueError( 'The number of files resulting from paths_or_selector ' 'must be equal to the number of partitions.' ) + for i, info in enumerate(infos): + if info.is_file: + fragment = format.make_fragment(info.path, filesystem, + partitions[i]) + c_fragments.push_back(fragment.unwrap()) + if root_partition is None: root_partition = ScalarExpression(True) elif not isinstance(root_partition, Expression): @@ -318,9 +322,7 @@ cdef class FileSystemDataset(Dataset): pyarrow_unwrap_schema(schema), ( root_partition).unwrap(), ( format).unwrap(), - ( filesystem).unwrap(), - c_file_infos, - c_partitions + c_fragments ) self.init(GetResultValue(result)) @@ -339,6 +341,7 @@ cdef class FileSystemDataset(Dataset): """The FileFormat of this source.""" return FileFormat.wrap(self.filesystem_dataset.format()) + cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter, Schema schema) except *: assert schema is not None @@ -393,7 +396,7 @@ cdef class FileFormat: shared_ptr[CSchema] c_schema c_schema = GetResultValue(self.format.Inspect(CFileSource( - tobytes(path), filesystem.unwrap().get()))) + tobytes(path), filesystem.unwrap()))) return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, str path not None, FileSystem filesystem not None, @@ -408,7 +411,7 @@ cdef class FileFormat: c_fragment = GetResultValue( self.format.MakeFragment(CFileSource(tobytes(path), - filesystem.unwrap().get()), + filesystem.unwrap()), partition_expression.unwrap())) return Fragment.wrap( move(c_fragment)) @@ -569,7 +572,7 @@ cdef class FileFragment(Fragment): """ cdef: shared_ptr[CFileSystem] fs - fs = self.file_fragment.source().filesystem().shared_from_this() + fs = self.file_fragment.source().filesystem() return FileSystem.wrap(fs) @property @@ -748,8 +751,7 @@ cdef class ParquetFileFormat(FileFormat): c_fragment = GetResultValue( self.parquet_format.MakeFragment(CFileSource(tobytes(path), - filesystem.unwrap() - .get()), + filesystem.unwrap()), partition_expression.unwrap(), move(c_row_groups))) return Fragment.wrap( move(c_fragment)) diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index 90cf3312d4b..b1e17155757 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -81,7 +81,7 @@ cdef class FileInfo: return '' s = ' self.info.type()) + @property + def is_file(self): + """ + """ + return self.type == FileType.File + @property def path(self): """ @@ -129,9 +135,7 @@ cdef class FileInfo: Only regular files are guaranteed to have a size. """ - if self.info.type() != CFileType_File: - return None - return self.info.size() + return self.info.size() if self.is_file else None @property def extension(self): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 86b24079337..75e835b3d88 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -233,9 +233,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CFileSource "arrow::dataset::FileSource": const c_string& path() const - CFileSystem* filesystem() const + const shared_ptr[CFileSystem]& filesystem() const const shared_ptr[CBuffer]& buffer() const - CFileSource(c_string path, CFileSystem* filesystem) + CFileSource(c_string path, shared_ptr[CFileSystem] filesystem) cdef cppclass CFileFormat "arrow::dataset::FileFormat": c_string type_name() const @@ -260,12 +260,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CSchema] schema, shared_ptr[CExpression] source_partition, shared_ptr[CFileFormat] format, - shared_ptr[CFileSystem] filesystem, - vector[CFileInfo] infos, - CExpressionVector partitions) + CFragmentVector fragments) c_string type() vector[c_string] files() - const shared_ptr[CFileFormat] format() + const shared_ptr[CFileFormat]& format() const cdef cppclass CParquetFileFormatReaderOptions \ "arrow::dataset::ParquetFileFormat::ReaderOptions": diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 30e815b1cc7..d9698a5ff81 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -209,12 +209,14 @@ def test_filesystem_dataset(mockfs): paths_or_selector=paths, partitions=partitions ) + assert isinstance(dataset.format, ds.ParquetFileFormat) # the root_partition and partitions keywords have defaults dataset = ds.FileSystemDataset( paths, schema, format=file_format, filesystem=mockfs, ) + assert isinstance(dataset.format, ds.ParquetFileFormat) # validation of required arguments @@ -259,9 +261,9 @@ def test_filesystem_dataset(mockfs): fragments = list(dataset.get_fragments()) for fragment, partition, path in zip(fragments, partitions, paths): - assert fragment.partition_expression.equals( - ds.AndExpression(root_partition, partition)) + assert fragment.partition_expression.equals(partition) assert fragment.path == path + assert isinstance(fragment.format, ds.ParquetFileFormat) assert isinstance(fragment, ds.ParquetFileFragment) assert fragment.row_groups is None From 80df00eb0748ed140d07eb1348d9897417dc866e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 1 May 2020 10:24:21 -0400 Subject: [PATCH 2/2] Address comments --- cpp/src/arrow/dataset/discovery.cc | 11 +++++------ cpp/src/arrow/dataset/discovery.h | 2 +- cpp/src/arrow/dataset/file_base.cc | 13 ++++--------- cpp/src/arrow/dataset/file_base.h | 4 +++- cpp/src/arrow/dataset/filter.h | 6 +++++- cpp/src/arrow/dataset/test_util.h | 2 +- python/pyarrow/_dataset.pyx | 11 +++++++---- python/pyarrow/includes/libarrow_dataset.pxd | 2 +- 8 files changed, 27 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 1449716778f..412208c6c10 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -110,7 +110,7 @@ FileSystemDatasetFactory::FileSystemDatasetFactory( format_(std::move(format)), options_(std::move(options)) {} -util::optional FileSystemDatasetFactory::BaselessPath( +util::optional FileSystemDatasetFactory::RemovePartitionBaseDir( util::string_view path) { const util::string_view partition_base_dir{options_.partition_base_dir}; return fs::internal::RemoveAncestor(partition_base_dir, path); @@ -193,7 +193,7 @@ Result> FileSystemDatasetFactory::PartitionSchema() { std::vector relative_paths; for (const auto& path : paths_) { - if (auto relative = BaselessPath(path)) { + if (auto relative = RemovePartitionBaseDir(path)) { relative_paths.push_back(*relative); } } @@ -241,12 +241,11 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema)); } - FragmentVector fragments; + std::vector> fragments; for (const auto& path : paths_) { std::shared_ptr partition = scalar(true); - if (auto relative = BaselessPath(path)) { - std::string path_string{*relative}; - partition = partitioning->Parse(path_string).ValueOr(scalar(true)); + if (auto relative = RemovePartitionBaseDir(path)) { + partition = partitioning->Parse(relative->to_string()).ValueOr(scalar(true)); } ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition)); diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 5030b7ee540..8c8508c6cdd 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -233,7 +233,7 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { FileSystemFactoryOptions options_; private: - util::optional BaselessPath(util::string_view path); + util::optional RemovePartitionBaseDir(util::string_view path); }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 02ad596cecd..9d3a427ccc7 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -91,16 +91,11 @@ FileSystemDataset::FileSystemDataset(std::shared_ptr schema, Result> FileSystemDataset::Make( std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, FragmentVector fragments) { - std::vector> file_fragments; - for (const auto& fragment : fragments) { - auto file_fragment = internal::checked_pointer_cast(fragment); - file_fragments.push_back(std::move(file_fragment)); - } - + std::shared_ptr format, + std::vector> fragments) { return std::shared_ptr( new FileSystemDataset(std::move(schema), std::move(root_partition), - std::move(format), std::move(file_fragments))); + std::move(format), std::move(fragments))); } Result> FileSystemDataset::ReplaceSchema( @@ -164,7 +159,7 @@ Result> FileSystemDataset::Write( auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir); auto extension = "." + plan.format->type_name(); - FragmentVector fragments; + std::vector> fragments; for (size_t i = 0; i < plan.paths.size(); ++i) { const auto& op = plan.fragment_or_partition_expressions[i]; if (util::holds_alternative>(op)) { diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f3243e09aa2..2732f81a09e 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -194,6 +194,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { /// /// \param[in] schema the schema of the dataset /// \param[in] root_partition the partition expression of the dataset + /// \param[in] format the format of each FileFragment. /// \param[in] fragments list of fragments to create the dataset from /// /// Note that all fragment must be of `FileFragment` type. The type are @@ -202,7 +203,8 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { /// \return A constructed dataset. static Result> Make( std::shared_ptr schema, std::shared_ptr root_partition, - std::shared_ptr format, FragmentVector fragments); + std::shared_ptr format, + std::vector> fragments); /// \brief Write to a new format and filesystem location, preserving partitioning. /// diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index d8ef4f7f890..4a00bf09478 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -188,7 +188,11 @@ class ARROW_DS_EXPORT Expression { /// This is a shortcut to check if the expression is neither null nor false. bool IsSatisfiable() const { return !IsNull() && !Equals(false); } - bool IsSatisfiableWith(const std::shared_ptr other) const { + /// Indicates if the expression is satisfiable given an other expression. + /// + /// This behaves like IsSatisfiable, but it simplifies the current expression + /// with the given `other` information. + bool IsSatisfiableWith(const std::shared_ptr& other) const { return Assume(other)->IsSatisfiable(); } diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 26b43ef7816..75a45618244 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -300,7 +300,7 @@ struct MakeFileSystemDatasetMixin { MakeFileSystem(infos); auto format = std::make_shared(); - FragmentVector fragments; + std::vector> fragments; for (size_t i = 0; i < n_fragments; i++) { const auto& info = infos[i]; if (!info.IsFile()) { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 182a3a49d0c..7c8f006f41f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -275,10 +275,11 @@ cdef class FileSystemDataset(Dataset): cdef: FileInfo info Expression expr - Fragment fragment + FileFragment fragment vector[CFileInfo] c_file_infos vector[shared_ptr[CExpression]] c_partitions - vector[shared_ptr[CFragment]] c_fragments + shared_ptr[CFileFragment] c_fragment + vector[shared_ptr[CFileFragment]] c_fragments CResult[shared_ptr[CDataset]] result # validate required arguments @@ -296,7 +297,7 @@ cdef class FileSystemDataset(Dataset): infos = filesystem.get_file_info(paths_or_selector) if partitions is None: - partitions = [ScalarExpression(True) for _ in range(len(infos))] + partitions = [ScalarExpression(True)] * len(infos) if len(infos) != len(partitions): raise ValueError( @@ -308,7 +309,9 @@ cdef class FileSystemDataset(Dataset): if info.is_file: fragment = format.make_fragment(info.path, filesystem, partitions[i]) - c_fragments.push_back(fragment.unwrap()) + c_fragments.push_back( + static_pointer_cast[CFileFragment, CFragment]( + fragment.unwrap())) if root_partition is None: root_partition = ScalarExpression(True) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 75e835b3d88..43d8d63df15 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -260,7 +260,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CSchema] schema, shared_ptr[CExpression] source_partition, shared_ptr[CFileFormat] format, - CFragmentVector fragments) + vector[shared_ptr[CFileFragment]] fragments) c_string type() vector[c_string] files() const shared_ptr[CFileFormat]& format() const