diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index c890b3c5935..06f36c7f5ad 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -456,17 +456,31 @@ Result Expression::Bind(const Schema& in_schema, return BindImpl(*this, in_schema, exec_context); } -Result MakeExecBatch(const Schema& full_schema, const Datum& partial) { +Result MakeExecBatch(const Schema& full_schema, const Datum& partial, + Expression guarantee) { ExecBatch out; if (partial.kind() == Datum::RECORD_BATCH) { const auto& partial_batch = *partial.record_batch(); + out.guarantee = std::move(guarantee); out.length = partial_batch.num_rows(); + ARROW_ASSIGN_OR_RAISE(auto known_field_values, + ExtractKnownFieldValues(out.guarantee)); + for (const auto& field : full_schema.fields()) { - ARROW_ASSIGN_OR_RAISE(auto column, - FieldRef(field->name()).GetOneOrNone(partial_batch)); + auto field_ref = FieldRef(field->name()); + + // If we know what the value must be from the guarantee, prefer to use that value + // than the data from the record batch (if it exists at all -- probably it doesn't), + // because this way it will be a scalar. + auto known_field_value = known_field_values.map.find(field_ref); + if (known_field_value != known_field_values.map.end()) { + out.values.emplace_back(known_field_value->second); + continue; + } + ARROW_ASSIGN_OR_RAISE(auto column, field_ref.GetOneOrNone(partial_batch)); if (column) { if (!column->type()->Equals(field->type())) { // Referenced field was present but didn't have the expected type. @@ -490,13 +504,14 @@ Result MakeExecBatch(const Schema& full_schema, const Datum& partial) ARROW_ASSIGN_OR_RAISE(auto partial_batch, RecordBatch::FromStructArray(partial.make_array())); - return MakeExecBatch(full_schema, partial_batch); + return MakeExecBatch(full_schema, partial_batch, std::move(guarantee)); } if (partial.is_scalar()) { ARROW_ASSIGN_OR_RAISE(auto partial_array, MakeArrayFromScalar(*partial.scalar(), 1)); - ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array)); + ARROW_ASSIGN_OR_RAISE( + auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee))); for (Datum& value : out.values) { if (value.is_scalar()) continue; diff --git a/cpp/src/arrow/compute/exec/expression.h b/cpp/src/arrow/compute/exec/expression.h index e9026961aa9..a872e799597 100644 --- a/cpp/src/arrow/compute/exec/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -226,7 +226,8 @@ Result SimplifyWithGuarantee(Expression, /// RecordBatch which may have missing or incorrectly ordered columns. /// Missing fields will be replaced with null scalars. ARROW_EXPORT Result MakeExecBatch(const Schema& full_schema, - const Datum& partial); + const Datum& partial, + Expression guarantee = literal(true)); /// Execute a scalar expression against the provided state and input ExecBatch. This /// expression must be bound. diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index d2d0923d03d..c3e5b2a4779 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -900,9 +900,6 @@ Result MakeScanNode(compute::ExecPlan* plan, std::move(batch_gen), [scan_options](const EnumeratedRecordBatch& partial) -> Result> { - ARROW_ASSIGN_OR_RAISE(util::optional batch, - compute::MakeExecBatch(*scan_options->dataset_schema, - partial.record_batch.value)); // TODO(ARROW-13263) fragments may be able to attach more guarantees to batches // than this, for example parquet's row group stats. Failing to do this leaves // perf on the table because row group stats could be used to skip kernel execs in @@ -911,7 +908,12 @@ Result MakeScanNode(compute::ExecPlan* plan, // Additionally, if a fragment failed to perform projection pushdown there may be // unnecessarily materialized columns in batch. We could drop them now instead of // letting them coast through the rest of the plan. - batch->guarantee = partial.fragment.value->partition_expression(); + auto guarantee = partial.fragment.value->partition_expression(); + + ARROW_ASSIGN_OR_RAISE( + util::optional batch, + compute::MakeExecBatch(*scan_options->dataset_schema, + partial.record_batch.value, guarantee)); // tag rows with fragment- and batch-of-origin batch->values.emplace_back(partial.fragment.index); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 5316f63d080..864c4d5ebad 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -27,6 +27,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" #include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression_internal.h" #include "arrow/dataset/plan.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" @@ -1358,16 +1359,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON( const std::shared_ptr& dataset_schema, const std::shared_ptr& physical_schema, const std::vector>& fragment_batch_strs, - const std::vector& guarantees, - std::function make_exec_batch = {}) { + const std::vector& guarantees) { + // If guarantees are provided we must have one for each batch if (!guarantees.empty()) { EXPECT_EQ(fragment_batch_strs.size(), guarantees.size()); } + RecordBatchVector record_batches; FragmentVector fragments; fragments.reserve(fragment_batch_strs.size()); - for (size_t i = 0; i < fragment_batch_strs.size(); i++) { - const auto& batch_strs = fragment_batch_strs[i]; + + // construct fragments first + for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++) { + const auto& batch_strs = fragment_batch_strs[frag_ndx]; RecordBatchVector fragment_batches; fragment_batches.reserve(batch_strs.size()); for (const auto& batch_str : batch_strs) { @@ -1377,37 +1381,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON( fragment_batches.end()); fragments.push_back(std::make_shared( physical_schema, std::move(fragment_batches), - guarantees.empty() ? literal(true) : guarantees[i])); + guarantees.empty() ? literal(true) : guarantees[frag_ndx])); } + // then construct ExecBatches that can reference fields from constructed Fragments std::vector batches; auto batch_it = record_batches.begin(); - for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size(); - ++fragment_index) { - for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size(); - ++batch_index) { + for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx) { + size_t frag_batch_count = fragment_batch_strs[frag_ndx].size(); + + for (size_t batch_index = 0; batch_index < frag_batch_count; ++batch_index) { const auto& batch = *batch_it++; // the scanned ExecBatches will begin with physical columns batches.emplace_back(*batch); - // allow customizing the ExecBatch (e.g. to fill in placeholders for partition - // fields) - if (make_exec_batch) { - make_exec_batch(&batches.back(), *batch); + // augment scanned ExecBatch with columns for this fragment's guarantee + if (!guarantees.empty()) { + EXPECT_OK_AND_ASSIGN(auto known_fields, + ExtractKnownFieldValues(guarantees[frag_ndx])); + for (const auto& known_field : known_fields.map) { + batches.back().values.emplace_back(known_field.second); + } } // scanned batches will be augmented with fragment and batch indices - batches.back().values.emplace_back(static_cast(fragment_index)); + batches.back().values.emplace_back(static_cast(frag_ndx)); batches.back().values.emplace_back(static_cast(batch_index)); // ... and with the last-in-fragment flag - batches.back().values.emplace_back(batch_index == - fragment_batch_strs[fragment_index].size() - 1); - batches.back().values.emplace_back(fragments[fragment_index]->ToString()); + batches.back().values.emplace_back(batch_index == frag_batch_count - 1); + batches.back().values.emplace_back(fragments[frag_ndx]->ToString()); // each batch carries a guarantee inherited from its Fragment's partition expression - batches.back().guarantee = fragments[fragment_index]->partition_expression(); + batches.back().guarantee = fragments[frag_ndx]->partition_expression(); } } @@ -1424,31 +1431,26 @@ DatasetAndBatches MakeBasicDataset() { const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - return DatasetAndBatchesFromJSON( - dataset_schema, physical_schema, - { - { - R"([{"a": 1, "b": null}, - {"a": 2, "b": true}])", - R"([{"a": null, "b": true}, - {"a": 3, "b": false}])", - }, - { - R"([{"a": null, "b": true}, - {"a": 4, "b": false}])", - R"([{"a": 5, "b": null}, - {"a": 6, "b": false}, - {"a": 7, "b": false}])", - }, - }, - { - equal(field_ref("c"), literal(23)), - equal(field_ref("c"), literal(47)), - }, - [](compute::ExecBatch* batch, const RecordBatch&) { - // a placeholder will be inserted for partition field "c" - batch->values.emplace_back(std::make_shared()); - }); + return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + }, + { + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + }, + }, + { + equal(field_ref("c"), literal(23)), + equal(field_ref("c"), literal(47)), + }); } DatasetAndBatches MakeNestedDataset() {