From 6592ad2472bad6f268448cdb9a0e6c2c0dd4a6e4 Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Tue, 5 Jul 2022 19:40:21 +0200 Subject: [PATCH 1/9] Materialize constant columns by means of guarantee expr in Scanner --- cpp/src/arrow/compute/exec/expression.cc | 25 +++++++++++++++++++----- cpp/src/arrow/compute/exec/expression.h | 3 ++- cpp/src/arrow/dataset/scanner.cc | 10 ++++++---- cpp/src/arrow/dataset/scanner_test.cc | 13 ++++++++---- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index c890b3c5935..06f36c7f5ad 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -456,17 +456,31 @@ Result Expression::Bind(const Schema& in_schema, return BindImpl(*this, in_schema, exec_context); } -Result MakeExecBatch(const Schema& full_schema, const Datum& partial) { +Result MakeExecBatch(const Schema& full_schema, const Datum& partial, + Expression guarantee) { ExecBatch out; if (partial.kind() == Datum::RECORD_BATCH) { const auto& partial_batch = *partial.record_batch(); + out.guarantee = std::move(guarantee); out.length = partial_batch.num_rows(); + ARROW_ASSIGN_OR_RAISE(auto known_field_values, + ExtractKnownFieldValues(out.guarantee)); + for (const auto& field : full_schema.fields()) { - ARROW_ASSIGN_OR_RAISE(auto column, - FieldRef(field->name()).GetOneOrNone(partial_batch)); + auto field_ref = FieldRef(field->name()); + + // If we know what the value must be from the guarantee, prefer to use that value + // than the data from the record batch (if it exists at all -- probably it doesn't), + // because this way it will be a scalar. + auto known_field_value = known_field_values.map.find(field_ref); + if (known_field_value != known_field_values.map.end()) { + out.values.emplace_back(known_field_value->second); + continue; + } + ARROW_ASSIGN_OR_RAISE(auto column, field_ref.GetOneOrNone(partial_batch)); if (column) { if (!column->type()->Equals(field->type())) { // Referenced field was present but didn't have the expected type. @@ -490,13 +504,14 @@ Result MakeExecBatch(const Schema& full_schema, const Datum& partial) ARROW_ASSIGN_OR_RAISE(auto partial_batch, RecordBatch::FromStructArray(partial.make_array())); - return MakeExecBatch(full_schema, partial_batch); + return MakeExecBatch(full_schema, partial_batch, std::move(guarantee)); } if (partial.is_scalar()) { ARROW_ASSIGN_OR_RAISE(auto partial_array, MakeArrayFromScalar(*partial.scalar(), 1)); - ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array)); + ARROW_ASSIGN_OR_RAISE( + auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee))); for (Datum& value : out.values) { if (value.is_scalar()) continue; diff --git a/cpp/src/arrow/compute/exec/expression.h b/cpp/src/arrow/compute/exec/expression.h index e9026961aa9..a872e799597 100644 --- a/cpp/src/arrow/compute/exec/expression.h +++ b/cpp/src/arrow/compute/exec/expression.h @@ -226,7 +226,8 @@ Result SimplifyWithGuarantee(Expression, /// RecordBatch which may have missing or incorrectly ordered columns. /// Missing fields will be replaced with null scalars. ARROW_EXPORT Result MakeExecBatch(const Schema& full_schema, - const Datum& partial); + const Datum& partial, + Expression guarantee = literal(true)); /// Execute a scalar expression against the provided state and input ExecBatch. This /// expression must be bound. diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index d2d0923d03d..c3e5b2a4779 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -900,9 +900,6 @@ Result MakeScanNode(compute::ExecPlan* plan, std::move(batch_gen), [scan_options](const EnumeratedRecordBatch& partial) -> Result> { - ARROW_ASSIGN_OR_RAISE(util::optional batch, - compute::MakeExecBatch(*scan_options->dataset_schema, - partial.record_batch.value)); // TODO(ARROW-13263) fragments may be able to attach more guarantees to batches // than this, for example parquet's row group stats. Failing to do this leaves // perf on the table because row group stats could be used to skip kernel execs in @@ -911,7 +908,12 @@ Result MakeScanNode(compute::ExecPlan* plan, // Additionally, if a fragment failed to perform projection pushdown there may be // unnecessarily materialized columns in batch. We could drop them now instead of // letting them coast through the rest of the plan. - batch->guarantee = partial.fragment.value->partition_expression(); + auto guarantee = partial.fragment.value->partition_expression(); + + ARROW_ASSIGN_OR_RAISE( + util::optional batch, + compute::MakeExecBatch(*scan_options->dataset_schema, + partial.record_batch.value, guarantee)); // tag rows with fragment- and batch-of-origin batch->values.emplace_back(partial.fragment.index); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 5316f63d080..0f452f866e7 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1359,7 +1359,8 @@ DatasetAndBatches DatasetAndBatchesFromJSON( const std::shared_ptr& physical_schema, const std::vector>& fragment_batch_strs, const std::vector& guarantees, - std::function make_exec_batch = {}) { + std::function + make_exec_batch = {}) { if (!guarantees.empty()) { EXPECT_EQ(fragment_batch_strs.size(), guarantees.size()); } @@ -1394,7 +1395,7 @@ DatasetAndBatches DatasetAndBatchesFromJSON( // allow customizing the ExecBatch (e.g. to fill in placeholders for partition // fields) if (make_exec_batch) { - make_exec_batch(&batches.back(), *batch); + make_exec_batch(&batches.back(), *batch, fragment_index); } // scanned batches will be augmented with fragment and batch indices @@ -1445,9 +1446,13 @@ DatasetAndBatches MakeBasicDataset() { equal(field_ref("c"), literal(23)), equal(field_ref("c"), literal(47)), }, - [](compute::ExecBatch* batch, const RecordBatch&) { + [](compute::ExecBatch* batch, const RecordBatch&, size_t guarantee_index) { // a placeholder will be inserted for partition field "c" - batch->values.emplace_back(std::make_shared()); + if (guarantee_index == 0) { + batch->values.emplace_back(std::make_shared(23)); + } else { + batch->values.emplace_back(std::make_shared(47)); + } }); } From 8b9137e8fb7ca0d4d017bc995d6705a271ae4141 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 15 Jul 2022 15:44:02 -0700 Subject: [PATCH 2/9] [ARROW-16700]: updated DatasetAndBatchesFromJSON The major part of this commit is to add columns to ExecBatches from the guarantees vector instead of taking a lambda. The lambda can be repetitive and a source of error. This still only adds the guarantees to ExecBatches and not the Dataset --- cpp/src/arrow/dataset/scanner_test.cc | 83 +++++++++++++-------------- 1 file changed, 40 insertions(+), 43 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 0f452f866e7..b01fc789387 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -27,6 +27,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" #include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression_internal.h" #include "arrow/dataset/plan.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" @@ -1358,17 +1359,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON( const std::shared_ptr& dataset_schema, const std::shared_ptr& physical_schema, const std::vector>& fragment_batch_strs, - const std::vector& guarantees, - std::function - make_exec_batch = {}) { + const std::vector& guarantees) { + // Expect a guarantee expr for each test fragment if (!guarantees.empty()) { EXPECT_EQ(fragment_batch_strs.size(), guarantees.size()); } + RecordBatchVector record_batches; FragmentVector fragments; fragments.reserve(fragment_batch_strs.size()); - for (size_t i = 0; i < fragment_batch_strs.size(); i++) { - const auto& batch_strs = fragment_batch_strs[i]; + + // construct fragments first + for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++) { + const auto& batch_strs = fragment_batch_strs[frag_ndx]; RecordBatchVector fragment_batches; fragment_batches.reserve(batch_strs.size()); for (const auto& batch_str : batch_strs) { @@ -1378,37 +1381,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON( fragment_batches.end()); fragments.push_back(std::make_shared( physical_schema, std::move(fragment_batches), - guarantees.empty() ? literal(true) : guarantees[i])); + guarantees.empty() ? literal(true) : guarantees[frag_ndx])); } + // then construct ExecBatches that can reference fields from constructed Fragments std::vector batches; auto batch_it = record_batches.begin(); - for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size(); - ++fragment_index) { - for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size(); - ++batch_index) { + for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx) { + size_t frag_batch_count = fragment_batch_strs[frag_ndx].size(); + + for (size_t batch_index = 0; batch_index < frag_batch_count; ++batch_index) { const auto& batch = *batch_it++; // the scanned ExecBatches will begin with physical columns batches.emplace_back(*batch); - // allow customizing the ExecBatch (e.g. to fill in placeholders for partition - // fields) - if (make_exec_batch) { - make_exec_batch(&batches.back(), *batch, fragment_index); + // augment scanned ExecBatch with columns for this fragment's guarantee + if (not guarantees.empty()) { + auto extract_result = ExtractKnownFieldValues(guarantees[frag_ndx]); + ARROW_WARN_NOT_OK(extract_result.status(), "ExtractKnownFieldValues failed"); + for (const auto& known_field : extract_result->map) { + batches.back().values.emplace_back(known_field.second); + } } // scanned batches will be augmented with fragment and batch indices - batches.back().values.emplace_back(static_cast(fragment_index)); + batches.back().values.emplace_back(static_cast(frag_ndx)); batches.back().values.emplace_back(static_cast(batch_index)); // ... and with the last-in-fragment flag - batches.back().values.emplace_back(batch_index == - fragment_batch_strs[fragment_index].size() - 1); - batches.back().values.emplace_back(fragments[fragment_index]->ToString()); + batches.back().values.emplace_back(batch_index == frag_batch_count - 1); + batches.back().values.emplace_back(fragments[frag_ndx]->ToString()); // each batch carries a guarantee inherited from its Fragment's partition expression - batches.back().guarantee = fragments[fragment_index]->partition_expression(); + batches.back().guarantee = fragments[frag_ndx]->partition_expression(); } } @@ -1425,35 +1431,26 @@ DatasetAndBatches MakeBasicDataset() { const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - return DatasetAndBatchesFromJSON( - dataset_schema, physical_schema, - { - { - R"([{"a": 1, "b": null}, + return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null}, {"a": 2, "b": true}])", - R"([{"a": null, "b": true}, + R"([{"a": null, "b": true}, {"a": 3, "b": false}])", - }, - { - R"([{"a": null, "b": true}, + }, + { + R"([{"a": null, "b": true}, {"a": 4, "b": false}])", - R"([{"a": 5, "b": null}, + R"([{"a": 5, "b": null}, {"a": 6, "b": false}, {"a": 7, "b": false}])", - }, - }, - { - equal(field_ref("c"), literal(23)), - equal(field_ref("c"), literal(47)), - }, - [](compute::ExecBatch* batch, const RecordBatch&, size_t guarantee_index) { - // a placeholder will be inserted for partition field "c" - if (guarantee_index == 0) { - batch->values.emplace_back(std::make_shared(23)); - } else { - batch->values.emplace_back(std::make_shared(47)); - } - }); + }, + }, + { + equal(field_ref("c"), literal(23)), + equal(field_ref("c"), literal(47)), + }); } DatasetAndBatches MakeNestedDataset() { From f8ff2767c1a9e5fed1783c4c00708351211c1114 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 15 Jul 2022 15:57:34 -0700 Subject: [PATCH 3/9] fixing whitespace whitespace for test data was demolished by clang-format, this just restores the whitespace that made it human readable --- cpp/src/arrow/dataset/scanner_test.cc | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b01fc789387..b26dae16ee0 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1431,21 +1431,20 @@ DatasetAndBatches MakeBasicDataset() { const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, + return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, { { - { - R"([{"a": 1, "b": null}, - {"a": 2, "b": true}])", - R"([{"a": null, "b": true}, - {"a": 3, "b": false}])", - }, - { - R"([{"a": null, "b": true}, - {"a": 4, "b": false}])", - R"([{"a": 5, "b": null}, - {"a": 6, "b": false}, - {"a": 7, "b": false}])", - }, + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + }, + { + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + }, }, { equal(field_ref("c"), literal(23)), From 56fd9171df5467b7929114c423a03baec4a85125 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 15 Jul 2022 16:17:56 -0700 Subject: [PATCH 4/9] attempt to style fix --- cpp/src/arrow/dataset/scanner_test.cc | 40 ++++++++++++++------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b26dae16ee0..1c789ec59fc 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1431,25 +1431,27 @@ DatasetAndBatches MakeBasicDataset() { const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, { - { - R"([{"a": 1, "b": null}, - {"a": 2, "b": true}])", - R"([{"a": null, "b": true}, - {"a": 3, "b": false}])", - }, - { - R"([{"a": null, "b": true}, - {"a": 4, "b": false}])", - R"([{"a": 5, "b": null}, - {"a": 6, "b": false}, - {"a": 7, "b": false}])", - }, - }, - { - equal(field_ref("c"), literal(23)), - equal(field_ref("c"), literal(47)), - }); + return DatasetAndBatchesFromJSON( + dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + }, + { + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + }, + }, + { + equal(field_ref("c"), literal(23)), + equal(field_ref("c"), literal(47)), + }); } DatasetAndBatches MakeNestedDataset() { From 8aad7842b2beb12daed2fd19acd24ddb2d5862ad Mon Sep 17 00:00:00 2001 From: octalene Date: Mon, 18 Jul 2022 10:33:58 -0700 Subject: [PATCH 5/9] Update cpp/src/arrow/dataset/scanner_test.cc minor style change Co-authored-by: Weston Pace --- cpp/src/arrow/dataset/scanner_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 1c789ec59fc..fae7dc6793a 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1397,7 +1397,7 @@ DatasetAndBatches DatasetAndBatchesFromJSON( batches.emplace_back(*batch); // augment scanned ExecBatch with columns for this fragment's guarantee - if (not guarantees.empty()) { + if (!guarantees.empty()) { auto extract_result = ExtractKnownFieldValues(guarantees[frag_ndx]); ARROW_WARN_NOT_OK(extract_result.status(), "ExtractKnownFieldValues failed"); for (const auto& known_field : extract_result->map) { From a4e685a2cf45c7d198c34c55694f81ab28cf338f Mon Sep 17 00:00:00 2001 From: octalene Date: Mon, 18 Jul 2022 10:35:02 -0700 Subject: [PATCH 6/9] Update cpp/src/arrow/dataset/scanner_test.cc changing use of ARROW_WARN_NOT_OK A more appropriate macro was suggested, EXPECT_OK_AND_ASSIGN Co-authored-by: Weston Pace --- cpp/src/arrow/dataset/scanner_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index fae7dc6793a..f7916d438c5 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1398,8 +1398,7 @@ DatasetAndBatches DatasetAndBatchesFromJSON( // augment scanned ExecBatch with columns for this fragment's guarantee if (!guarantees.empty()) { - auto extract_result = ExtractKnownFieldValues(guarantees[frag_ndx]); - ARROW_WARN_NOT_OK(extract_result.status(), "ExtractKnownFieldValues failed"); + EXPECT_OK_AND_ASSIGN(auto extract_result, ExtractKnownFieldValues(guarantees[frag_ndx])); for (const auto& known_field : extract_result->map) { batches.back().values.emplace_back(known_field.second); } From 23163b87df168adc888ee3ead10a31a0c74f70b4 Mon Sep 17 00:00:00 2001 From: Aldrin Montana Date: Mon, 18 Jul 2022 13:02:07 -0700 Subject: [PATCH 7/9] [ARROW-16700]: new macro has different return type changed variable name and method invocation because EXPECT_OK_AND_ASSIGN unwraps the result whereas ARROW_RETURN_NOT_OK did not --- cpp/src/arrow/dataset/scanner_test.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index f7916d438c5..8877a21908c 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1398,8 +1398,9 @@ DatasetAndBatches DatasetAndBatchesFromJSON( // augment scanned ExecBatch with columns for this fragment's guarantee if (!guarantees.empty()) { - EXPECT_OK_AND_ASSIGN(auto extract_result, ExtractKnownFieldValues(guarantees[frag_ndx])); - for (const auto& known_field : extract_result->map) { + EXPECT_OK_AND_ASSIGN(auto known_fields, + ExtractKnownFieldValues(guarantees[frag_ndx])); + for (const auto& known_field : known_fields.map) { batches.back().values.emplace_back(known_field.second); } } From b38719b3df658d129239fd9baa1ee897c623706d Mon Sep 17 00:00:00 2001 From: Aldrin Montana Date: Mon, 18 Jul 2022 13:04:39 -0700 Subject: [PATCH 8/9] [ARROW-16700]: fixed style --- cpp/src/arrow/dataset/scanner_test.cc | 41 +++++++++++++-------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 8877a21908c..b3d391d9f21 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1431,27 +1431,26 @@ DatasetAndBatches MakeBasicDataset() { const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - return DatasetAndBatchesFromJSON( - dataset_schema, physical_schema, - { - { - R"([{"a": 1, "b": null}, - {"a": 2, "b": true}])", - R"([{"a": null, "b": true}, - {"a": 3, "b": false}])", - }, - { - R"([{"a": null, "b": true}, - {"a": 4, "b": false}])", - R"([{"a": 5, "b": null}, - {"a": 6, "b": false}, - {"a": 7, "b": false}])", - }, - }, - { - equal(field_ref("c"), literal(23)), - equal(field_ref("c"), literal(47)), - }); + return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + }, + { + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + }, + }, + { + equal(field_ref("c"), literal(23)), + equal(field_ref("c"), literal(47)), + }); } DatasetAndBatches MakeNestedDataset() { From 5ff90cc60fa03e854b36a974316c3331d47656ce Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 22 Jul 2022 09:22:56 -0700 Subject: [PATCH 9/9] Update cpp/src/arrow/dataset/scanner_test.cc --- cpp/src/arrow/dataset/scanner_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b3d391d9f21..864c4d5ebad 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1360,7 +1360,7 @@ DatasetAndBatches DatasetAndBatchesFromJSON( const std::shared_ptr& physical_schema, const std::vector>& fragment_batch_strs, const std::vector& guarantees) { - // Expect a guarantee expr for each test fragment + // If guarantees are provided we must have one for each batch if (!guarantees.empty()) { EXPECT_EQ(fragment_batch_strs.size(), guarantees.size()); }