From 13ba9bc9dd95fa2268496559c7046c49f6de8981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 20 Mar 2020 14:04:36 -0400 Subject: [PATCH 1/4] Implement InspectOptions --- cpp/src/arrow/dataset/dataset_test.cc | 5 +- cpp/src/arrow/dataset/discovery.cc | 52 +++++------ cpp/src/arrow/dataset/discovery.h | 48 +++++++--- cpp/src/arrow/dataset/discovery_test.cc | 114 +++++++++++++++++------- 4 files changed, 149 insertions(+), 70 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 4cc748920c9..91e364ee5be 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -343,7 +343,10 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) { // schema evolved by adding/renaming columns. In this case, the schema is // passed to the dataset constructor. // The inspected_schema may optionally be modified before being finalized. - ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect()); + InspectOptions inspect_options; + inspect_options.depth = InspectOptions::kInspectAllFragments; + inspect_options.with_partition_inspection = true; + ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect(inspect_options)); EXPECT_EQ(*schema_, *inspected_schema); // Build the Dataset where partitions are attached to fragments (files). diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 983bf1c6488..860a9e4c137 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -36,16 +36,21 @@ namespace dataset { DatasetFactory::DatasetFactory() : root_partition_(scalar(true)) {} -Result> DatasetFactory::Inspect() { - ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas()); +Result> DatasetFactory::Inspect(InspectOptions options) { + ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(std::move(options))); if (schemas.empty()) { - schemas.push_back(arrow::schema({})); + return arrow::schema({}); } return UnifySchemas(schemas); } +Result> DatasetFactory::Finish() { + ARROW_ASSIGN_OR_RAISE(auto schema, Inspect()); + return Finish(schema); +} + UnionDatasetFactory::UnionDatasetFactory( std::vector> factories) : factories_(std::move(factories)) {} @@ -62,27 +67,22 @@ Result> UnionDatasetFactory::Make( new UnionDatasetFactory(std::move(factories))}; } -Result>> UnionDatasetFactory::InspectSchemas() { +Result>> UnionDatasetFactory::InspectSchemas( + InspectOptions options) { std::vector> schemas; for (const auto& child_factory : factories_) { - ARROW_ASSIGN_OR_RAISE(auto schema, child_factory->Inspect()); - schemas.emplace_back(schema); + // Instead of applying options globally, apply it at each child factory. + // This will not respect `options.depth` exactly, but will respect the + // spirit of peeking the first fragments or all of them. + ARROW_ASSIGN_OR_RAISE(auto child_schemas, child_factory->InspectSchemas(options)); + ARROW_ASSIGN_OR_RAISE(auto child_schema, UnifySchemas(child_schemas)); + schemas.emplace_back(child_schema); } return schemas; } -Result> UnionDatasetFactory::Inspect() { - ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas()); - - if (schemas.empty()) { - return arrow::schema({}); - } - - return UnifySchemas(schemas); -} - Result> UnionDatasetFactory::Finish( const std::shared_ptr& schema) { std::vector> children; @@ -220,31 +220,33 @@ Result> FileSystemDatasetFactory::PartitionSchema() { return options_.partitioning.factory()->Inspect(paths); } -Result>> FileSystemDatasetFactory::InspectSchemas() { +Result>> FileSystemDatasetFactory::InspectSchemas( + InspectOptions options) { std::vector> schemas; + int depth = options.depth; for (const auto& f : forest_.infos()) { if (!f.IsFile()) continue; + if (options.depth >= 0 && depth-- == 0) break; FileSource src(f.path(), fs_.get()); ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src)); schemas.push_back(schema); } - ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema()); - schemas.push_back(partition_schema); + // Inspecting the schema of a PartitioningFactory can be costly. + bool partition_requires_inspection = options_.partitioning.partitioning() == nullptr; + if (!partition_requires_inspection || options.with_partition_inspection) { + ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema()); + schemas.push_back(partition_schema); + } return schemas; } -Result> DatasetFactory::Finish() { - ARROW_ASSIGN_OR_RAISE(auto schema, Inspect()); - return Finish(schema); -} - Result> FileSystemDatasetFactory::Finish( const std::shared_ptr& schema) { // This validation can be costly, but better safe than sorry. - ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas()); + ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas({})); for (const auto& s : schemas) { RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s})); } diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index b657b4451e8..97be701252a 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -37,15 +37,44 @@ namespace arrow { namespace dataset { -/// \brief SourceFactory provides a way to inspect/discover a Source's expected -/// schema before materializing said Source. +struct InspectOptions { + /// See `depth` property. + static constexpr int kInspectAllFragments = -1; + + /// Indicate how many fragments' schema should be inspected. This limits the + /// number of fragment accessed to inquire their schemas, thus improving the + /// latency of the discovery process when dealing with a high number of fragment + /// and/or high latency file systems. + /// + /// The default value of `1` inspect the schema of the first fragment only, + /// in no particular order. If the dataset has a uniform schema for all fragments, + /// this default is the optimal value. In order to inspect all fragments, set + /// this option to `kInspectAllFragments`. + int depth = 1; + + /// Indicate if the inspection process should also invoke the + /// PartitioningFactory::Inspect if the Dataset supports Partitioning. + bool with_partition_inspection = false; +}; + +struct FinishOptions { + std::shared_ptr schema; + + // Indicate if the factory should validate the schema with every schemas + // returned by InspectSchemas(). + bool validate_schema = false; +}; + +/// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected +/// schema before materializing said Dataset. class ARROW_DS_EXPORT DatasetFactory { public: /// \brief Get the schemas of the Fragments and Partitioning. - virtual Result>> InspectSchemas() = 0; + virtual Result>> InspectSchemas( + InspectOptions options) = 0; /// \brief Get unified schema for the resulting Dataset. - virtual Result> Inspect(); + Result> Inspect(InspectOptions options = {}); /// \brief Create a Dataset with the given schema. virtual Result> Finish( @@ -82,10 +111,8 @@ class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory { } /// \brief Get the schemas of the Datasets. - Result>> InspectSchemas() override; - - /// \brief Get unified schema for the resulting Dataset. - Result> Inspect() override; + Result>> InspectSchemas( + InspectOptions options) override; /// \brief Create a Dataset with the given schema. Result> Finish(const std::shared_ptr& schema) override; @@ -131,7 +158,7 @@ struct FileSystemFactoryOptions { // in a serial and single threaded fashion. Disabling this feature will skip the // IO, but unsupported files may be present in the Dataset // (resulting in an error at scan time). - bool exclude_invalid_files = true; + bool exclude_invalid_files = false; // Files matching one of the following prefix will be ignored by the // discovery process. This is matched to the basename of a path. @@ -180,7 +207,8 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { std::shared_ptr filesystem, fs::FileSelector selector, std::shared_ptr format, FileSystemFactoryOptions options); - Result>> InspectSchemas() override; + Result>> InspectSchemas( + InspectOptions options) override; Result> Finish(const std::shared_ptr& schema) override; diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index bbb851fe35f..624827e21f1 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -29,6 +29,8 @@ #include "arrow/testing/gtest_util.h" #include "arrow/type_fwd.h" +using testing::SizeIs; + namespace arrow { namespace dataset { @@ -42,13 +44,14 @@ void AssertSchemasAre(std::vector> actual, class DatasetFactoryTest : public TestFileSystemDataset { public: - void AssertInspect(const std::vector>& expected_fields) { - ASSERT_OK_AND_ASSIGN(auto actual, factory_->Inspect()); - EXPECT_EQ(*actual, Schema(expected_fields)); + void AssertInspect(std::shared_ptr expected, InspectOptions options = {}) { + ASSERT_OK_AND_ASSIGN(auto actual, factory_->Inspect(options)); + EXPECT_EQ(*actual, *expected); } - void AssertInspectSchemas(std::vector> expected) { - ASSERT_OK_AND_ASSIGN(auto actual, factory_->InspectSchemas()); + void AssertInspectSchemas(std::vector> expected, + InspectOptions options = {}) { + ASSERT_OK_AND_ASSIGN(auto actual, factory_->InspectSchemas(options)); AssertSchemasAre(actual, expected); } @@ -61,7 +64,8 @@ class MockDatasetFactory : public DatasetFactory { explicit MockDatasetFactory(std::vector> schemas) : schemas_(std::move(schemas)) {} - Result>> InspectSchemas() override { + Result>> InspectSchemas( + InspectOptions options) override { return schemas_; } @@ -107,22 +111,22 @@ class MockDatasetFactoryTest : public DatasetFactoryTest { TEST_F(MockDatasetFactoryTest, UnifySchemas) { MakeFactory({}); - AssertInspect({}); + AssertInspect(schema({})); MakeFactory({schema({i32}), schema({i32})}); - AssertInspect({i32}); + AssertInspect(schema({i32})); MakeFactory({schema({i32}), schema({i64})}); - AssertInspect({i32, i64}); + AssertInspect(schema({i32, i64})); MakeFactory({schema({i32}), schema({i64})}); - AssertInspect({i32, i64}); + AssertInspect(schema({i32, i64})); MakeFactory({schema({i32}), schema({i32_req})}); - AssertInspect({i32}); + AssertInspect(schema({i32})); MakeFactory({schema({i32, f64}), schema({i32_req, i64})}); - AssertInspect({i32, f64, i64}); + AssertInspect(schema({i32, f64, i64})); MakeFactory({schema({i32, f64}), schema({f64, i32_fake})}); // Unification fails when fields with the same name have clashing types. @@ -140,9 +144,10 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { } void AssertFinishWithPaths(std::vector paths, - std::shared_ptr schema = nullptr) { + std::shared_ptr schema = nullptr, + InspectOptions options = {}) { if (schema == nullptr) { - ASSERT_OK_AND_ASSIGN(schema, factory_->Inspect()); + ASSERT_OK_AND_ASSIGN(schema, factory_->Inspect(options)); } options_ = ScanOptions::Make(schema); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); @@ -172,46 +177,72 @@ TEST_F(FileSystemDatasetFactoryTest, Selector) { factory_options_.partition_base_dir = "A/A"; MakeFactory({fs::File("0"), fs::File("A/a"), fs::File("A/A/a")}); // partition_base_dir should not affect filtered files, only the applied partition - AssertInspect({}); + AssertInspect(schema({})); AssertFinishWithPaths({"A/a", "A/A/a"}); } TEST_F(FileSystemDatasetFactoryTest, ExplicitPartition) { selector_.base_dir = "a=ignored/base"; + auto part_field = field("a", int32()); factory_options_.partitioning = - std::make_shared(schema({field("a", float64())})); + std::make_shared(schema({part_field})); + + auto a_1 = "a=ignored/base/a=1"; + MakeFactory({fs::File(a_1)}); + + InspectOptions options; - MakeFactory( - {fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")}); + // Should add the partition schema when requested + options.with_partition_inspection = true; + AssertInspect(schema({part_field}), options); + AssertFinishWithPaths({a_1}, nullptr, options); - AssertInspect({field("a", float64())}); - AssertFinishWithPaths({selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"}); + // When given an explicit partition, add the schema irregardless of + // `with_partition_inspection` since the cost is free. + options.with_partition_inspection = false; + AssertInspect(schema({part_field}), options); + AssertFinishWithPaths({a_1}, nullptr, options); } TEST_F(FileSystemDatasetFactoryTest, DiscoveredPartition) { selector_.base_dir = "a=ignored/base"; factory_options_.partitioning = HivePartitioning::MakeFactory(); - MakeFactory( - {fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")}); - AssertInspect({field("a", int32())}); - AssertFinishWithPaths({selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"}); + auto a_1 = "a=ignored/base/a=1"; + MakeFactory({fs::File(a_1)}); + + InspectOptions options; + + // Ensure no partitions is inspected if the option disable it. + options.with_partition_inspection = false; + auto schema_without = schema({}); + AssertInspect(schema_without, options); + AssertFinishWithPaths({a_1}, schema_without); + + options.with_partition_inspection = true; + auto schema_with = schema({field("a", int32())}); + AssertInspect(schema_with, options); + AssertFinishWithPaths({a_1}, schema_with); } TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) { - MakeFileSystem({fs::File("base_dir/a=3/b=3/dat"), fs::File("unpartitioned/ignored=3")}); + auto partition_path = "base_dir/a=3/b=3/dat"; + auto unpartition_path = "unpartitioned/ignored=3"; + MakeFileSystem({fs::File(partition_path), fs::File(unpartition_path)}); factory_options_.partition_base_dir = "base_dir"; factory_options_.partitioning = std::make_shared( schema({field("a", int32()), field("b", int32())})); ASSERT_OK_AND_ASSIGN( - factory_, FileSystemDatasetFactory::Make( - fs_, {"base_dir/a=3/b=3/dat", "unpartitioned/ignored=3"}, format_, - factory_options_)); + factory_, FileSystemDatasetFactory::Make(fs_, {partition_path, unpartition_path}, + format_, factory_options_)); - AssertInspect({field("a", int32()), field("b", int32())}); - AssertFinishWithPaths({"base_dir/a=3/b=3/dat", "unpartitioned/ignored=3"}); + InspectOptions options; + options.with_partition_inspection = true; + + AssertInspect(schema({field("a", int32()), field("b", int32())}), options); + AssertFinishWithPaths({partition_path, unpartition_path}); } TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultPrefixes) { @@ -245,10 +276,10 @@ TEST_F(FileSystemDatasetFactoryTest, Inspect) { // No files MakeFactory({}); - AssertInspect({}); + AssertInspect(schema({})); MakeFactory({fs::File("test")}); - AssertInspect(s->fields()); + AssertInspect(s); } TEST_F(FileSystemDatasetFactoryTest, FinishWithIncompatibleSchemaShouldFail) { @@ -264,6 +295,21 @@ TEST_F(FileSystemDatasetFactoryTest, FinishWithIncompatibleSchemaShouldFail) { ASSERT_RAISES(Invalid, factory_->Finish(broken_s)); } +TEST_F(FileSystemDatasetFactoryTest, InspectFragmentsLimit) { + MakeFactory({fs::File("a"), fs::File("b"), fs::File("c")}); + + InspectOptions options; + // By default, inspect one fragment and the partitioning. + ASSERT_OK_AND_ASSIGN(auto schemas, factory_->InspectSchemas(options)); + EXPECT_THAT(schemas, SizeIs(2)); + + for (int depth = 0; depth < 3; depth++) { + options.depth = depth; + ASSERT_OK_AND_ASSIGN(auto schemas, factory_->InspectSchemas(options)); + EXPECT_THAT(schemas, SizeIs(depth + 1)); + } +} + std::shared_ptr DatasetFactoryFromSchemas( std::vector> schemas) { return std::make_shared(schemas); @@ -286,7 +332,7 @@ TEST(UnionDatasetFactoryTest, Basic) { ASSERT_OK_AND_ASSIGN(auto factory, UnionDatasetFactory::Make({dataset_1, dataset_2, dataset_3})); - ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas()); + ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas({})); AssertSchemasAre(schemas, {schema_2, schema_2, schema_3}); auto expected_schema = schema({f64, i32, str}); @@ -325,7 +371,7 @@ TEST(UnionDatasetFactoryTest, ConflictingSchemas) { ASSERT_RAISES(Invalid, factory->Finish()); // The user can inspect without error - ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas()); + ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas({})); AssertSchemasAre(schemas, {schema_2, schema_2, schema_3}); // The user decided to ignore the conflicting `f64` field. From af1fb7219b154086093e8d212732da3abfc2d494 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Mon, 23 Mar 2020 10:57:53 -0400 Subject: [PATCH 2/4] Add FinishOptions to DatasetFactory::Finish --- .../arrow/dataset-parquet-scan-example.cc | 7 ++- cpp/src/arrow/dataset/dataset_test.cc | 3 +- cpp/src/arrow/dataset/discovery.cc | 61 +++++++++++-------- cpp/src/arrow/dataset/discovery.h | 48 +++++++-------- cpp/src/arrow/dataset/discovery_test.cc | 46 ++++++-------- 5 files changed, 83 insertions(+), 82 deletions(-) diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc b/cpp/examples/arrow/dataset-parquet-scan-example.cc index 965521dc09d..16d674bb16e 100644 --- a/cpp/examples/arrow/dataset-parquet-scan-example.cc +++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc @@ -63,6 +63,9 @@ struct Configuration { // Indicates the filter by which rows will be filtered. This optimization can // make use of partition information and/or file metadata if possible. std::shared_ptr filter = ("total_amount"_ > 1000.0f).Copy(); + + ds::InspectOptions inspect_options{}; + ds::FinishOptions finish_options{}; } conf; std::shared_ptr GetFileSystemFromUri(const std::string& uri, @@ -83,10 +86,10 @@ std::shared_ptr GetDatasetFromPath(std::shared_ptr auto factory = ds::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie(); // Try to infer a common schema for all files. - auto schema = factory->Inspect().ValueOrDie(); + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); // Caller can optionally decide another schema as long as it is compatible // with the previous one, e.g. `factory->Finish(compatible_schema)`. - auto child = factory->Finish().ValueOrDie(); + auto child = factory->Finish(conf.finish_options).ValueOrDie(); ds::DatasetVector children{conf.repeat, child}; auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 91e364ee5be..7f82b0dcff4 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -344,8 +344,7 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) { // passed to the dataset constructor. // The inspected_schema may optionally be modified before being finalized. InspectOptions inspect_options; - inspect_options.depth = InspectOptions::kInspectAllFragments; - inspect_options.with_partition_inspection = true; + inspect_options.fragments = InspectOptions::kInspectAllFragments; ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect(inspect_options)); EXPECT_EQ(*schema_, *inspected_schema); diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 860a9e4c137..ea0591ecc3c 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -47,8 +47,14 @@ Result> DatasetFactory::Inspect(InspectOptions options) } Result> DatasetFactory::Finish() { - ARROW_ASSIGN_OR_RAISE(auto schema, Inspect()); - return Finish(schema); + FinishOptions options; + return Finish(options); +} + +Result> DatasetFactory::Finish(std::shared_ptr schema) { + FinishOptions options; + options.schema = schema; + return Finish(options); } UnionDatasetFactory::UnionDatasetFactory( @@ -73,7 +79,7 @@ Result>> UnionDatasetFactory::InspectSchemas for (const auto& child_factory : factories_) { // Instead of applying options globally, apply it at each child factory. - // This will not respect `options.depth` exactly, but will respect the + // This will not respect `options.fragments` exactly, but will respect the // spirit of peeking the first fragments or all of them. ARROW_ASSIGN_OR_RAISE(auto child_schemas, child_factory->InspectSchemas(options)); ARROW_ASSIGN_OR_RAISE(auto child_schema, UnifySchemas(child_schemas)); @@ -83,21 +89,20 @@ Result>> UnionDatasetFactory::InspectSchemas return schemas; } -Result> UnionDatasetFactory::Finish( - const std::shared_ptr& schema) { +Result> UnionDatasetFactory::Finish(FinishOptions options) { std::vector> children; + if (options.schema == nullptr) { + // Set the schema in the option directly for the next `child_factory->Finish()` + ARROW_ASSIGN_OR_RAISE(options.schema, Inspect(options.inspect_options)); + } + for (const auto& child_factory : factories_) { - ARROW_ASSIGN_OR_RAISE(auto child, child_factory->Finish(schema)); + ARROW_ASSIGN_OR_RAISE(auto child, child_factory->Finish(options)); children.emplace_back(child); } - return std::shared_ptr(new UnionDataset(schema, std::move(children))); -} - -Result> UnionDatasetFactory::Finish() { - ARROW_ASSIGN_OR_RAISE(auto schema, Inspect()); - return Finish(schema); + return std::shared_ptr(new UnionDataset(options.schema, std::move(children))); } FileSystemDatasetFactory::FileSystemDatasetFactory( @@ -224,31 +229,34 @@ Result>> FileSystemDatasetFactory::InspectSc InspectOptions options) { std::vector> schemas; - int depth = options.depth; + int fragments = options.fragments; for (const auto& f : forest_.infos()) { if (!f.IsFile()) continue; - if (options.depth >= 0 && depth-- == 0) break; + if (options.fragments >= 0 && fragments-- == 0) break; FileSource src(f.path(), fs_.get()); ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src)); schemas.push_back(schema); } - // Inspecting the schema of a PartitioningFactory can be costly. - bool partition_requires_inspection = options_.partitioning.partitioning() == nullptr; - if (!partition_requires_inspection || options.with_partition_inspection) { - ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema()); - schemas.push_back(partition_schema); - } + ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema()); + schemas.push_back(partition_schema); return schemas; } -Result> FileSystemDatasetFactory::Finish( - const std::shared_ptr& schema) { - // This validation can be costly, but better safe than sorry. - ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas({})); - for (const auto& s : schemas) { - RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s})); +Result> FileSystemDatasetFactory::Finish(FinishOptions options) { + std::shared_ptr schema = options.schema; + bool schema_missing = schema == nullptr; + if (schema_missing) { + ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options)); + } + + if (options.validate_fragments && !schema_missing) { + // If the schema is not provided, the unification will implicitly valid + ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(options.inspect_options)); + for (const auto& s : schemas) { + RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s})); + } } ExpressionVector partitions(forest_.size(), scalar(true)); @@ -275,7 +283,6 @@ Result> FileSystemDatasetFactory::Finish( }; RETURN_NOT_OK(forest_.Visit(apply_partitioning)); - return FileSystemDataset::Make(schema, root_partition_, format_, fs_, forest_, std::move(partitions)); } diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 97be701252a..721933e5562 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -38,7 +38,7 @@ namespace arrow { namespace dataset { struct InspectOptions { - /// See `depth` property. + /// See `fragments` property. static constexpr int kInspectAllFragments = -1; /// Indicate how many fragments' schema should be inspected. This limits the @@ -49,20 +49,25 @@ struct InspectOptions { /// The default value of `1` inspect the schema of the first fragment only, /// in no particular order. If the dataset has a uniform schema for all fragments, /// this default is the optimal value. In order to inspect all fragments, set - /// this option to `kInspectAllFragments`. - int depth = 1; - - /// Indicate if the inspection process should also invoke the - /// PartitioningFactory::Inspect if the Dataset supports Partitioning. - bool with_partition_inspection = false; + /// this option to `kInspectAllFragments`. A value of `0` disable inspection + /// of fragments. + int fragments = 1; }; struct FinishOptions { - std::shared_ptr schema; - - // Indicate if the factory should validate the schema with every schemas - // returned by InspectSchemas(). - bool validate_schema = false; + /// Finalize the dataset with this given schema. If the schema is not + /// provided, infer the schema via the Inspect, see the `inspect_options` + /// property. + std::shared_ptr schema = NULLPTR; + + /// If the schema is not provided, it will be discovered by passing the + /// following options to `DatasetDiscovery::Inspect`. + InspectOptions inspect_options{}; + + /// Indicate if the given Schema (when inferred), should be validated against + /// the fragments' schemas. `inspect_options` will control how many fragments + /// are checked. + bool validate_fragments = false; }; /// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected @@ -76,12 +81,10 @@ class ARROW_DS_EXPORT DatasetFactory { /// \brief Get unified schema for the resulting Dataset. Result> Inspect(InspectOptions options = {}); - /// \brief Create a Dataset with the given schema. - virtual Result> Finish( - const std::shared_ptr& schema) = 0; - - /// \brief Create a Dataset using the inspected schema. - virtual Result> Finish(); + /// \brief Create a Dataset + Result> Finish(); + Result> Finish(std::shared_ptr schema); + virtual Result> Finish(FinishOptions options) = 0; /// \brief Optional root partition for the resulting Dataset. const std::shared_ptr& root_partition() const { return root_partition_; } @@ -114,11 +117,8 @@ class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory { Result>> InspectSchemas( InspectOptions options) override; - /// \brief Create a Dataset with the given schema. - Result> Finish(const std::shared_ptr& schema) override; - - /// \brief Create a Dataset using the inspected schema. - Result> Finish() override; + /// \brief Create a Dataset. + Result> Finish(FinishOptions options) override; protected: explicit UnionDatasetFactory(std::vector> factories); @@ -210,7 +210,7 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { Result>> InspectSchemas( InspectOptions options) override; - Result> Finish(const std::shared_ptr& schema) override; + Result> Finish(FinishOptions options) override; protected: FileSystemDatasetFactory(std::shared_ptr filesystem, diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index 624827e21f1..d8204ebaef1 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -69,9 +69,8 @@ class MockDatasetFactory : public DatasetFactory { return schemas_; } - Result> Finish( - const std::shared_ptr& schema) override { - return std::make_shared(schema, + Result> Finish(FinishOptions options) override { + return std::make_shared(options.schema, std::vector>{}); } @@ -191,15 +190,8 @@ TEST_F(FileSystemDatasetFactoryTest, ExplicitPartition) { MakeFactory({fs::File(a_1)}); InspectOptions options; - - // Should add the partition schema when requested - options.with_partition_inspection = true; - AssertInspect(schema({part_field}), options); - AssertFinishWithPaths({a_1}, nullptr, options); - - // When given an explicit partition, add the schema irregardless of - // `with_partition_inspection` since the cost is free. - options.with_partition_inspection = false; + // Should inspect the Schema even if no files are inspected. + options.fragments = 0; AssertInspect(schema({part_field}), options); AssertFinishWithPaths({a_1}, nullptr, options); } @@ -213,13 +205,6 @@ TEST_F(FileSystemDatasetFactoryTest, DiscoveredPartition) { InspectOptions options; - // Ensure no partitions is inspected if the option disable it. - options.with_partition_inspection = false; - auto schema_without = schema({}); - AssertInspect(schema_without, options); - AssertFinishWithPaths({a_1}, schema_without); - - options.with_partition_inspection = true; auto schema_with = schema({field("a", int32())}); AssertInspect(schema_with, options); AssertFinishWithPaths({a_1}, schema_with); @@ -239,8 +224,6 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) { format_, factory_options_)); InspectOptions options; - options.with_partition_inspection = true; - AssertInspect(schema({field("a", int32()), field("b", int32())}), options); AssertFinishWithPaths({partition_path, unpartition_path}); } @@ -287,12 +270,21 @@ TEST_F(FileSystemDatasetFactoryTest, FinishWithIncompatibleSchemaShouldFail) { format_ = std::make_shared(s); auto broken_s = schema({field("f64", utf8())}); - // No files + + FinishOptions options; + options.schema = broken_s; + options.validate_fragments = true; + + // No files and validation MakeFactory({}); - ASSERT_OK_AND_ASSIGN(auto dataset, factory_->Finish(broken_s)); + ASSERT_OK_AND_ASSIGN(auto dataset, factory_->Finish(options)); MakeFactory({fs::File("test")}); - ASSERT_RAISES(Invalid, factory_->Finish(broken_s)); + ASSERT_RAISES(Invalid, factory_->Finish(options)); + + // Disable validation + options.validate_fragments = false; + ASSERT_OK_AND_ASSIGN(dataset, factory_->Finish(options)); } TEST_F(FileSystemDatasetFactoryTest, InspectFragmentsLimit) { @@ -303,10 +295,10 @@ TEST_F(FileSystemDatasetFactoryTest, InspectFragmentsLimit) { ASSERT_OK_AND_ASSIGN(auto schemas, factory_->InspectSchemas(options)); EXPECT_THAT(schemas, SizeIs(2)); - for (int depth = 0; depth < 3; depth++) { - options.depth = depth; + for (int fragments = 0; fragments < 3; fragments++) { + options.fragments = fragments; ASSERT_OK_AND_ASSIGN(auto schemas, factory_->InspectSchemas(options)); - EXPECT_THAT(schemas, SizeIs(depth + 1)); + EXPECT_THAT(schemas, SizeIs(fragments + 1)); } } From ab5bf3223a7e286fd9489126c9ac1ba8f1f614d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Tue, 24 Mar 2020 11:03:04 -0400 Subject: [PATCH 3/4] Review --- cpp/src/arrow/dataset/discovery.cc | 13 ++++++------- cpp/src/arrow/dataset/discovery.h | 25 +++++++++++++++---------- cpp/src/arrow/dataset/discovery_test.cc | 2 +- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index ea0591ecc3c..7b3a040edda 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -54,7 +54,7 @@ Result> DatasetFactory::Finish() { Result> DatasetFactory::Finish(std::shared_ptr schema) { FinishOptions options; options.schema = schema; - return Finish(options); + return Finish(std::move(options)); } UnionDatasetFactory::UnionDatasetFactory( @@ -78,9 +78,6 @@ Result>> UnionDatasetFactory::InspectSchemas std::vector> schemas; for (const auto& child_factory : factories_) { - // Instead of applying options globally, apply it at each child factory. - // This will not respect `options.fragments` exactly, but will respect the - // spirit of peeking the first fragments or all of them. ARROW_ASSIGN_OR_RAISE(auto child_schemas, child_factory->InspectSchemas(options)); ARROW_ASSIGN_OR_RAISE(auto child_schema, UnifySchemas(child_schemas)); schemas.emplace_back(child_schema); @@ -93,7 +90,7 @@ Result> UnionDatasetFactory::Finish(FinishOptions optio std::vector> children; if (options.schema == nullptr) { - // Set the schema in the option directly for the next `child_factory->Finish()` + // Set the schema in the option directly for use in `child_factory->Finish()` ARROW_ASSIGN_OR_RAISE(options.schema, Inspect(options.inspect_options)); } @@ -229,10 +226,11 @@ Result>> FileSystemDatasetFactory::InspectSc InspectOptions options) { std::vector> schemas; + const bool has_fragments_limit = options.fragments >= 0; int fragments = options.fragments; for (const auto& f : forest_.infos()) { if (!f.IsFile()) continue; - if (options.fragments >= 0 && fragments-- == 0) break; + if (has_fragments_limit && fragments-- == 0) break; FileSource src(f.path(), fs_.get()); ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src)); schemas.push_back(schema); @@ -252,7 +250,8 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions } if (options.validate_fragments && !schema_missing) { - // If the schema is not provided, the unification will implicitly valid + // If the schema was not explicitly provided we don't need to validate + // since Inspect has already succeeded in producing a valid unified schema. ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(options.inspect_options)); for (const auto& s : schemas) { RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s})); diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 721933e5562..17d6dd6ea92 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -41,16 +41,17 @@ struct InspectOptions { /// See `fragments` property. static constexpr int kInspectAllFragments = -1; - /// Indicate how many fragments' schema should be inspected. This limits the - /// number of fragment accessed to inquire their schemas, thus improving the - /// latency of the discovery process when dealing with a high number of fragment - /// and/or high latency file systems. + /// Indicate how many fragments should be inspected to infer the unified dataset + /// schema. Limiting the number of fragments accessed improves the latency of + /// the discovery process when dealing with a high number of fragments and/or + /// high latency file systems. /// - /// The default value of `1` inspect the schema of the first fragment only, - /// in no particular order. If the dataset has a uniform schema for all fragments, - /// this default is the optimal value. In order to inspect all fragments, set - /// this option to `kInspectAllFragments`. A value of `0` disable inspection - /// of fragments. + /// The default value of `1` inspects the schema of the first (in no particular + /// order) fragment only. If the dataset has a uniform schema for all fragments, + /// this default is the optimal value. In order to inspect all fragments and + /// robustly unify their potentially varying schemas, set this option to + /// `kInspectAllFragments`. A value of `0` disables inspection of fragments + /// altogether so only the partitioning schema will be inspected. int fragments = 1; }; @@ -64,7 +65,7 @@ struct FinishOptions { /// following options to `DatasetDiscovery::Inspect`. InspectOptions inspect_options{}; - /// Indicate if the given Schema (when inferred), should be validated against + /// Indicate if the given Schema (when specified), should be validated against /// the fragments' schemas. `inspect_options` will control how many fragments /// are checked. bool validate_fragments = false; @@ -114,6 +115,10 @@ class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory { } /// \brief Get the schemas of the Datasets. + /// + /// Instead of applying options globally, it applies at each child factory. + /// This will not respect `options.fragments` exactly, but will respect the + /// spirit of peeking the first fragments or all of them. Result>> InspectSchemas( InspectOptions options) override; diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index d8204ebaef1..25e880018b1 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -190,7 +190,7 @@ TEST_F(FileSystemDatasetFactoryTest, ExplicitPartition) { MakeFactory({fs::File(a_1)}); InspectOptions options; - // Should inspect the Schema even if no files are inspected. + // Should inspect the partition's Schema even if no files are inspected. options.fragments = 0; AssertInspect(schema({part_field}), options); AssertFinishWithPaths({a_1}, nullptr, options); From 73310ebd67d36b6b658d3934317f0438c1e964a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Wed, 25 Mar 2020 09:00:16 -0400 Subject: [PATCH 4/4] Add python interface --- python/pyarrow/_dataset.pyx | 6 ++++-- python/pyarrow/includes/libarrow_dataset.pxd | 12 ++++++++++-- python/pyarrow/tests/test_dataset.py | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 690c50d1d31..fce908ede74 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -724,8 +724,9 @@ cdef class DatasetFactory: def inspect_schemas(self): cdef CResult[vector[shared_ptr[CSchema]]] result + cdef CInspectOptions options with nogil: - result = self.factory.InspectSchemas() + result = self.factory.InspectSchemas(options) schemas = [] for s in GetResultValue(result): @@ -741,8 +742,9 @@ cdef class DatasetFactory: Schema """ cdef CResult[shared_ptr[CSchema]] result + cdef CInspectOptions options with nogil: - result = self.factory.Inspect() + result = self.factory.Inspect(options) return pyarrow_wrap_schema(GetResultValue(result)) def finish(self, Schema schema=None): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9cac4be4b09..b263debb642 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -181,9 +181,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CUnionDataset]] Make(shared_ptr[CSchema] schema, CDatasetVector children) + cdef cppclass CInspectOptions "arrow::dataset::InspectOptions": + int fragments + + cdef cppclass CFinishOptions "arrow::dataset::FinishOptions": + shared_ptr[CSchema] schema + CInspectOptions inspect_options + c_bool validate_fragments + cdef cppclass CDatasetFactory "arrow::dataset::DatasetFactory": - CResult[vector[shared_ptr[CSchema]]] InspectSchemas() - CResult[shared_ptr[CSchema]] Inspect() + CResult[vector[shared_ptr[CSchema]]] InspectSchemas(CInspectOptions) + CResult[shared_ptr[CSchema]] Inspect(CInspectOptions) CResult[shared_ptr[CDataset]] FinishWithSchema "Finish"( const shared_ptr[CSchema] & schema) CResult[shared_ptr[CDataset]] Finish() diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ab25586ba29..99424d7ff62 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -482,7 +482,7 @@ def test_filesystem_factory(mockfs, paths_or_selector): ) assert options.partition_base_dir == 'subdir' assert options.ignore_prefixes == ['.', '_'] - assert options.exclude_invalid_files is True + assert options.exclude_invalid_files is False factory = ds.FileSystemDatasetFactory( mockfs, paths_or_selector, format, options