From 44c3ed59a9920f4093ce3b58ceb72d809020934c Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Tue, 18 Oct 2022 12:54:12 +0530 Subject: [PATCH 1/3] feat: raise error if partitioning field present in fragment --- cpp/src/arrow/dataset/discovery.cc | 20 +++++++++++++++----- python/pyarrow/tests/test_dataset.py | 8 ++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 2aca85e6adc..47210d0f920 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -236,24 +236,34 @@ Result>> FileSystemDatasetFactory::InspectSc InspectOptions options) { std::vector> schemas; + ARROW_ASSIGN_OR_RAISE(auto partition_schema, + options_.partitioning.GetOrInferSchema( + StripPrefixAndFilename(files_, options_.partition_base_dir))); + const bool has_fragments_limit = options.fragments >= 0; int fragments = options.fragments; for (const auto& info : files_) { if (has_fragments_limit && fragments-- == 0) break; auto result = format_->Inspect({info, fs_}); + if (ARROW_PREDICT_FALSE(!result.ok())) { return result.status().WithMessage( "Error creating dataset. Could not read schema from '", info.path(), "'. Is this a '", format_->type_name(), "' file?: ", result.status().message()); } + + if (partition_schema->num_fields()) { + auto field_check = + result->get()->CanReferenceFieldsByNames(partition_schema->field_names()); + if (ARROW_PREDICT_FALSE(field_check.ok())) { + return Status::Invalid( + "Error creating dataset. Partitioning field(s) present in fragment."); + } + } + schemas.push_back(result.MoveValueUnsafe()); } - - ARROW_ASSIGN_OR_RAISE(auto partition_schema, - options_.partitioning.GetOrInferSchema( - StripPrefixAndFilename(files_, options_.partition_base_dir))); schemas.push_back(partition_schema); - return schemas; } diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 66562b76c96..6ce2b7a3acb 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -684,6 +684,7 @@ def test_partitioning(): names=["f1", "f2", "part"] ) partitioning_schema = pa.schema([("part", pa.string())]) + modified_partitioning_schema = pa.schema([("f1", pa.int64())]) for klass in [ds.DirectoryPartitioning, ds.HivePartitioning, ds.FilenamePartitioning]: with tempfile.TemporaryDirectory() as tempdir: @@ -695,6 +696,13 @@ def test_partitioning(): load_back_table = load_back.to_table() assert load_back_table.equals(table) + # test roundtrip with different partitioning field + with pytest.raises(pa.ArrowInvalid): + load_back = ds.dataset(tempdir, format='ipc', + partitioning=klass( + modified_partitioning_schema) + ) + def test_expression_arithmetic_operators(): dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]})) From 63a764b166b015d488a22b4454696410f885ed3a Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 22 May 2023 11:18:49 +0530 Subject: [PATCH 2/3] feat: testing with SchemaBuilder::CONFLICT_APPEND --- cpp/src/arrow/dataset/discovery.cc | 16 ++++++++-------- cpp/src/arrow/type.cc | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 47210d0f920..5cf725aafbe 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -252,14 +252,14 @@ Result>> FileSystemDatasetFactory::InspectSc "'. Is this a '", format_->type_name(), "' file?: ", result.status().message()); } - if (partition_schema->num_fields()) { - auto field_check = - result->get()->CanReferenceFieldsByNames(partition_schema->field_names()); - if (ARROW_PREDICT_FALSE(field_check.ok())) { - return Status::Invalid( - "Error creating dataset. Partitioning field(s) present in fragment."); - } - } + // if (partition_schema->num_fields()) { + // auto field_check = + // result->get()->CanReferenceFieldsByNames(partition_schema->field_names()); + // if (ARROW_PREDICT_FALSE(field_check.ok())) { + // return Status::Invalid( + // "Error creating dataset. Partitioning field(s) present in fragment."); + // } + // } schemas.push_back(result.MoveValueUnsafe()); } diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 606b231f6f7..3d682d51d54 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -2114,7 +2114,7 @@ Result> UnifySchemas( return Status::Invalid("Can't unify schema with duplicate field names."); } - SchemaBuilder builder{schemas[0], SchemaBuilder::CONFLICT_MERGE, field_merge_options}; + SchemaBuilder builder{schemas[0], SchemaBuilder::CONFLICT_APPEND, field_merge_options}; for (size_t i = 1; i < schemas.size(); i++) { const auto& schema = schemas[i]; From 72ff24bd5b4e8d3bab2a3ed1fa364dc15e1dd692 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 5 Jun 2023 10:10:32 +0530 Subject: [PATCH 3/3] feat: checking for data type while discovery --- cpp/src/arrow/dataset/discovery.cc | 26 ++++++++++++------ cpp/src/arrow/dataset/scanner.cc | 44 +++++++++++++++++++++++------- cpp/src/arrow/type.cc | 2 +- 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 5cf725aafbe..05e9fc996c0 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -252,14 +252,24 @@ Result>> FileSystemDatasetFactory::InspectSc "'. Is this a '", format_->type_name(), "' file?: ", result.status().message()); } - // if (partition_schema->num_fields()) { - // auto field_check = - // result->get()->CanReferenceFieldsByNames(partition_schema->field_names()); - // if (ARROW_PREDICT_FALSE(field_check.ok())) { - // return Status::Invalid( - // "Error creating dataset. Partitioning field(s) present in fragment."); - // } - // } + if (partition_schema->num_fields()) { + auto field_check = + result->get()->CanReferenceFieldsByNames(partition_schema->field_names()); + + // Partitioning field present in fragment. Doing deep check with data type + if (ARROW_PREDICT_FALSE(field_check.ok())) { + const FieldVector fields = partition_schema->fields(); + + for(auto &it:fields){ + if (it && result->get()->GetFieldIndex(it->name())!=-1) { + if (it->type() != result->get()->GetFieldByName(it->name())->type()) { + return Status::Invalid( + "Error creating dataset. Partitioning field(s) present in fragment with different datatype."); + } + } + } + } + } schemas.push_back(result.MoveValueUnsafe()); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 18981d14519..29ad2ec61ee 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -170,23 +170,47 @@ Result> GetProjectedSchemaFromExpression( // Scan options has a number of options that we can infer from the dataset // schema if they are not specified. Status NormalizeScanOptions(const std::shared_ptr& scan_options, - const std::shared_ptr& dataset_schema) { + const std::shared_ptr& dataset) { if (scan_options->dataset_schema == nullptr) { - scan_options->dataset_schema = dataset_schema; + scan_options->dataset_schema = dataset->schema(); } if (!scan_options->filter.IsBound()) { ARROW_ASSIGN_OR_RAISE(scan_options->filter, - scan_options->filter.Bind(*dataset_schema)); + scan_options->filter.Bind(*dataset->schema())); } + // std::shared_ptr fs_dataset = std::dynamic_pointer_cast(dataset); + // if (fs_dataset) { + // std::shared_ptr partitioning = fs_dataset->partitioning(); + + // if (partitioning->schema()->num_fields()) { + // auto field_check = + // dataset->schema()->CanReferenceFieldsByNames(partitioning->schema()->field_names()); + + // // Partitioning field present in fragment. Doing deep check with data type + // if (ARROW_PREDICT_FALSE(field_check.ok())) { + // const FieldVector fields = partitioning->schema()->fields(); + + // for(auto &it:fields){ + // if (it && dataset->schema()->GetFieldIndex(it->name())!=-1) { + // if (it->type() != dataset->schema()->GetFieldByName(it->name())->type()) { + // return Status::Invalid( + // "Error creating dataset. Partitioning field(s) present in fragment with different datatype."); + // } + // } + // } + // } + // } + // } + if (!scan_options->projected_schema) { // If the user specifies a projection expression we can maybe infer from // that expression if (scan_options->projection.IsBound()) { ARROW_ASSIGN_OR_RAISE( auto project_schema, - GetProjectedSchemaFromExpression(scan_options->projection, dataset_schema)); + GetProjectedSchemaFromExpression(scan_options->projection, dataset->schema())); if (project_schema->num_fields() > 0) { scan_options->projected_schema = std::move(project_schema); } @@ -205,7 +229,7 @@ Status NormalizeScanOptions(const std::shared_ptr& scan_options, // process resultant dataset_schema after projection ARROW_ASSIGN_OR_RAISE( auto projected_schema, - GetProjectedSchemaFromExpression(scan_options->projection, dataset_schema)); + GetProjectedSchemaFromExpression(scan_options->projection, dataset->schema())); if (projected_schema->num_fields() > 0) { // create the projected schema only if the provided expressions @@ -220,7 +244,7 @@ Status NormalizeScanOptions(const std::shared_ptr& scan_options, // if projected_fields are not found, we default to creating the projected_schema // and projection from the dataset_schema. ARROW_ASSIGN_OR_RAISE(auto projection_descr, - ProjectionDescr::Default(*dataset_schema)); + ProjectionDescr::Default(*dataset->schema())); scan_options->projected_schema = std::move(projection_descr.schema); scan_options->projection = projection_descr.expression; } @@ -231,12 +255,12 @@ Status NormalizeScanOptions(const std::shared_ptr& scan_options, ARROW_ASSIGN_OR_RAISE( auto projection_descr, ProjectionDescr::FromNames(scan_options->projected_schema->field_names(), - *dataset_schema)); + *dataset->schema())); scan_options->projection = projection_descr.expression; } if (!scan_options->projection.IsBound()) { - auto fields = dataset_schema->fields(); + auto fields = dataset->schema()->fields(); for (const auto& aug_field : kAugmentedFields) { fields.push_back(aug_field); } @@ -426,7 +450,7 @@ Result ToEnumeratedRecordBatch( Result AsyncScanner::ScanBatchesUnorderedAsync( Executor* cpu_executor, bool sequence_fragments, bool use_legacy_batching) { - RETURN_NOT_OK(NormalizeScanOptions(scan_options_, dataset_->schema())); + RETURN_NOT_OK(NormalizeScanOptions(scan_options_, dataset_)); auto exec_context = std::make_shared(scan_options_->pool, cpu_executor); @@ -992,7 +1016,7 @@ Result MakeScanNode(acero::ExecPlan* plan, auto dataset = scan_node_options.dataset; bool require_sequenced_output = scan_node_options.require_sequenced_output; - RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema())); + RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset)); // using a generator for speculative forward compatibility with async fragment discovery ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset->GetFragments(scan_options->filter)); diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 3d682d51d54..606b231f6f7 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -2114,7 +2114,7 @@ Result> UnifySchemas( return Status::Invalid("Can't unify schema with duplicate field names."); } - SchemaBuilder builder{schemas[0], SchemaBuilder::CONFLICT_APPEND, field_merge_options}; + SchemaBuilder builder{schemas[0], SchemaBuilder::CONFLICT_MERGE, field_merge_options}; for (size_t i = 1; i < schemas.size(); i++) { const auto& schema = schemas[i];