Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <mutex>
#include <numeric>
#include <set>
#include <sstream>

#include "arrow/array/array_primitive.h"
Expand Down Expand Up @@ -135,20 +136,19 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression(
const std::shared_ptr<Schema>& dataset_schema) {
// process resultant dataset_schema after projection
FieldVector project_fields;
std::set<std::string> field_names;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a set here because my R test failed because it was generating duplicated fields in the schema--the projection expression included the nested field in two different places. Maybe ->arguments does deduplication so this wasn't a problem with non-nested refs. But IDK if this is the right choice, if someone cares about order that gets lost, or if there's a better way. What do you think @westonpace ? (I didn't run the C++ tests yet so maybe there are order-dependent tests that fail.)

Also, this function seems like a natural place to use FieldsInExpression (from expression.cc)--is there a reason it wasn't used here? It wouldn't solve the duplication issue because you could still have two nested field refs pointing to different fields within the same top-level struct, but it would let you assume that everything you're iterating over is a FieldRef.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if someone cares about order that gets lost, or if there's a better way.

We very much care about order here :). Fortunately, std::set is an ordered set so the order should not be lost.

if (auto call = projection.call()) {
if (call->function_name != "make_struct") {
return Status::Invalid("Top level projection expression call must be make_struct");
}
for (const compute::Expression& arg : call->arguments) {
if (auto field_ref = arg.field_ref()) {
if (field_ref->IsName()) {
auto field = dataset_schema->GetFieldByName(*field_ref->name());
if (field) {
project_fields.push_back(std::move(field));
}
// if the field is not present in the schema we ignore it.
// the case is if kAugmentedFields are present in the expression
// and if they are not present in the provided schema, we ignore them.
field_names.emplace(*field_ref->name());
} else if (field_ref->IsNested()) {
// We keep the top-level field name.
auto nested_field_refs = *field_ref->nested_refs();
field_names.emplace(*nested_field_refs[0].name());
} else {
return Status::Invalid(
"No projected schema was supplied and we could not infer the projected "
Expand All @@ -157,6 +157,15 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression(
}
}
}
for (auto f : field_names) {
auto field = dataset_schema->GetFieldByName(f);
if (field) {
// if the field is not present in the schema we ignore it.
// the case is if kAugmentedFields are present in the expression
// and if they are not present in the provided schema, we ignore them.
project_fields.push_back(std::move(field));
}
}
return schema(project_fields);
}

Expand Down
9 changes: 3 additions & 6 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,18 @@ ExecPlan <- R6Class("ExecPlan",
if (isTRUE(filter)) {
filter <- Expression$scalar(TRUE)
}
# Use FieldsInExpression to find all from dataset$selected_columns
colnames <- unique(unlist(map(
dataset$selected_columns,
field_names_in_expression
)))
projection <- dataset$selected_columns
dataset <- dataset$.data
assert_is(dataset, "Dataset")
} else {
assert_is(dataset, "Dataset")
# Just a dataset, not a query, so there's no predicates to push down
# so set some defaults
filter <- Expression$scalar(TRUE)
colnames <- names(dataset)
projection <- make_field_refs(colnames)
}

out <- ExecNode_Scan(self, dataset, filter, colnames %||% character(0))
out <- ExecNode_Scan(self, dataset, filter, projection)
# Hold onto the source data's schema so we can preserve schema metadata
# in the resulting Scan/Write
out$extras$source_schema <- dataset$schema
Expand Down
19 changes: 5 additions & 14 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,28 +288,31 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
std::shared_ptr<compute::ExecNode> ExecNode_Scan(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<ds::Dataset>& dataset,
const std::shared_ptr<compute::Expression>& filter,
std::vector<std::string> materialized_field_names) {
const std::shared_ptr<compute::Expression>& filter, cpp11::list projection) {
arrow::dataset::internal::Initialize();

// TODO: pass in FragmentScanOptions
auto options = std::make_shared<ds::ScanOptions>();

options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true);

options->dataset_schema = dataset->schema();

// This filter is only used for predicate pushdown;
// you still need to pass it to a FilterNode after to handle any other components
options->filter = *filter;

// ScanNode needs to know which fields to materialize (and which are unnecessary)
// ScanNode needs to know which fields to materialize.
// It will pull them from this projection to prune the scan,
// but you still need to Project after
std::vector<compute::Expression> exprs;
for (const auto& name : materialized_field_names) {
exprs.push_back(compute::field_ref(name));
for (SEXP expr : projection) {
auto expr_ptr = cpp11::as_cpp<std::shared_ptr<compute::Expression>>(expr);
exprs.push_back(*expr_ptr);
}

options->projection =
call("make_struct", std::move(exprs),
compute::MakeStructOptions{std::move(materialized_field_names)});
cpp11::strings field_names(projection.attr(R_NamesSymbol));
options->projection = call(
"make_struct", std::move(exprs),
compute::MakeStructOptions{cpp11::as_cpp<std::vector<std::string>>(field_names)});

return MakeExecNodeOrStop("scan", plan.get(), {},
ds::ScanNodeOptions{dataset, options});
Expand Down
19 changes: 0 additions & 19 deletions r/src/expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,6 @@ bool compute___expr__is_field_ref(const std::shared_ptr<compute::Expression>& x)
return x->field_ref() != nullptr;
}

// [[arrow::export]]
std::vector<std::string> field_names_in_expression(
const std::shared_ptr<compute::Expression>& x) {
std::vector<std::string> out;
std::vector<arrow::FieldRef> nested;

auto field_refs = FieldsInExpression(*x);
for (auto f : field_refs) {
if (f.IsNested()) {
// We keep the top-level field name.
nested = *f.nested_refs();
out.push_back(*nested[0].name());
} else {
out.push_back(*f.name());
}
}
return out;
}

// [[arrow::export]]
std::string compute___expr__get_field_ref_name(
const std::shared_ptr<compute::Expression>& x) {
Expand Down
1 change: 1 addition & 0 deletions r/tests/testthat/test-dplyr-query.R
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ test_that("Can use nested field refs", {
)

# Now with Dataset: make sure column pushdown in ScanNode works
skip_if_not_available("dataset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #33778 already!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks, I'll rebase and remove this.

expect_equal(
nested_data %>%
InMemoryDataset$create() %>%
Expand Down