Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,31 @@ Result<Expression> Expression::Bind(const Schema& in_schema,
return BindImpl(*this, in_schema, exec_context);
}

Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial) {
Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
Expression guarantee) {
ExecBatch out;

if (partial.kind() == Datum::RECORD_BATCH) {
const auto& partial_batch = *partial.record_batch();
out.guarantee = std::move(guarantee);
out.length = partial_batch.num_rows();

ARROW_ASSIGN_OR_RAISE(auto known_field_values,
ExtractKnownFieldValues(out.guarantee));

for (const auto& field : full_schema.fields()) {
ARROW_ASSIGN_OR_RAISE(auto column,
FieldRef(field->name()).GetOneOrNone(partial_batch));
auto field_ref = FieldRef(field->name());

// If we know what the value must be from the guarantee, prefer to use that value
// than the data from the record batch (if it exists at all -- probably it doesn't),
// because this way it will be a scalar.
auto known_field_value = known_field_values.map.find(field_ref);
if (known_field_value != known_field_values.map.end()) {
out.values.emplace_back(known_field_value->second);
continue;
}

ARROW_ASSIGN_OR_RAISE(auto column, field_ref.GetOneOrNone(partial_batch));
if (column) {
if (!column->type()->Equals(field->type())) {
// Referenced field was present but didn't have the expected type.
Expand All @@ -490,13 +504,14 @@ Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial)
ARROW_ASSIGN_OR_RAISE(auto partial_batch,
RecordBatch::FromStructArray(partial.make_array()));

return MakeExecBatch(full_schema, partial_batch);
return MakeExecBatch(full_schema, partial_batch, std::move(guarantee));
}

if (partial.is_scalar()) {
ARROW_ASSIGN_OR_RAISE(auto partial_array,
MakeArrayFromScalar(*partial.scalar(), 1));
ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array));
ARROW_ASSIGN_OR_RAISE(
auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee)));

for (Datum& value : out.values) {
if (value.is_scalar()) continue;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ Result<Expression> SimplifyWithGuarantee(Expression,
/// RecordBatch which may have missing or incorrectly ordered columns.
/// Missing fields will be replaced with null scalars.
ARROW_EXPORT Result<ExecBatch> MakeExecBatch(const Schema& full_schema,
const Datum& partial);
const Datum& partial,
Expression guarantee = literal(true));

/// Execute a scalar expression against the provided state and input ExecBatch. This
/// expression must be bound.
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,6 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
std::move(batch_gen),
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<util::optional<compute::ExecBatch>> {
ARROW_ASSIGN_OR_RAISE(util::optional<compute::ExecBatch> batch,
compute::MakeExecBatch(*scan_options->dataset_schema,
partial.record_batch.value));
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -911,7 +908,12 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
// Additionally, if a fragment failed to perform projection pushdown there may be
// unnecessarily materialized columns in batch. We could drop them now instead of
// letting them coast through the rest of the plan.
batch->guarantee = partial.fragment.value->partition_expression();
auto guarantee = partial.fragment.value->partition_expression();

ARROW_ASSIGN_OR_RAISE(
util::optional<compute::ExecBatch> batch,
compute::MakeExecBatch(*scan_options->dataset_schema,
partial.record_batch.value, guarantee));

// tag rows with fragment- and batch-of-origin
batch->values.emplace_back(partial.fragment.index);
Expand Down
88 changes: 45 additions & 43 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/expression_internal.h"
#include "arrow/dataset/plan.h"
#include "arrow/dataset/test_util.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -1358,16 +1359,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
const std::shared_ptr<Schema>& dataset_schema,
const std::shared_ptr<Schema>& physical_schema,
const std::vector<std::vector<std::string>>& fragment_batch_strs,
const std::vector<compute::Expression>& guarantees,
std::function<void(compute::ExecBatch*, const RecordBatch&)> make_exec_batch = {}) {
const std::vector<compute::Expression>& guarantees) {
// If guarantees are provided we must have one for each batch
if (!guarantees.empty()) {
EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
}

RecordBatchVector record_batches;
FragmentVector fragments;
fragments.reserve(fragment_batch_strs.size());
for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
const auto& batch_strs = fragment_batch_strs[i];

// construct fragments first
for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++) {
const auto& batch_strs = fragment_batch_strs[frag_ndx];
RecordBatchVector fragment_batches;
fragment_batches.reserve(batch_strs.size());
for (const auto& batch_str : batch_strs) {
Expand All @@ -1377,37 +1381,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
fragment_batches.end());
fragments.push_back(std::make_shared<InMemoryFragment>(
physical_schema, std::move(fragment_batches),
guarantees.empty() ? literal(true) : guarantees[i]));
guarantees.empty() ? literal(true) : guarantees[frag_ndx]));
}

// then construct ExecBatches that can reference fields from constructed Fragments
std::vector<compute::ExecBatch> batches;
auto batch_it = record_batches.begin();
for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
++fragment_index) {
for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size();
++batch_index) {
for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx) {
size_t frag_batch_count = fragment_batch_strs[frag_ndx].size();

for (size_t batch_index = 0; batch_index < frag_batch_count; ++batch_index) {
const auto& batch = *batch_it++;

// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);

// allow customizing the ExecBatch (e.g. to fill in placeholders for partition
// fields)
if (make_exec_batch) {
make_exec_batch(&batches.back(), *batch);
// augment scanned ExecBatch with columns for this fragment's guarantee
if (!guarantees.empty()) {
EXPECT_OK_AND_ASSIGN(auto known_fields,
ExtractKnownFieldValues(guarantees[frag_ndx]));
for (const auto& known_field : known_fields.map) {
batches.back().values.emplace_back(known_field.second);
}
}

// scanned batches will be augmented with fragment and batch indices
batches.back().values.emplace_back(static_cast<int>(fragment_index));
batches.back().values.emplace_back(static_cast<int>(frag_ndx));
batches.back().values.emplace_back(static_cast<int>(batch_index));

// ... and with the last-in-fragment flag
batches.back().values.emplace_back(batch_index ==
fragment_batch_strs[fragment_index].size() - 1);
batches.back().values.emplace_back(fragments[fragment_index]->ToString());
batches.back().values.emplace_back(batch_index == frag_batch_count - 1);
batches.back().values.emplace_back(fragments[frag_ndx]->ToString());

// each batch carries a guarantee inherited from its Fragment's partition expression
batches.back().guarantee = fragments[fragment_index]->partition_expression();
batches.back().guarantee = fragments[frag_ndx]->partition_expression();
}
}

Expand All @@ -1424,31 +1431,26 @@ DatasetAndBatches MakeBasicDataset() {

const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});

return DatasetAndBatchesFromJSON(
dataset_schema, physical_schema,
{
{
R"([{"a": 1, "b": null},
{"a": 2, "b": true}])",
R"([{"a": null, "b": true},
{"a": 3, "b": false}])",
},
{
R"([{"a": null, "b": true},
{"a": 4, "b": false}])",
R"([{"a": 5, "b": null},
{"a": 6, "b": false},
{"a": 7, "b": false}])",
},
},
{
equal(field_ref("c"), literal(23)),
equal(field_ref("c"), literal(47)),
},
[](compute::ExecBatch* batch, const RecordBatch&) {
// a placeholder will be inserted for partition field "c"
batch->values.emplace_back(std::make_shared<Int32Scalar>());
});
return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
{
{
R"([{"a": 1, "b": null},
{"a": 2, "b": true}])",
R"([{"a": null, "b": true},
{"a": 3, "b": false}])",
},
{
R"([{"a": null, "b": true},
{"a": 4, "b": false}])",
R"([{"a": 5, "b": null},
{"a": 6, "b": false},
{"a": 7, "b": false}])",
},
},
{
equal(field_ref("c"), literal(23)),
equal(field_ref("c"), literal(47)),
});
}

DatasetAndBatches MakeNestedDataset() {
Expand Down