Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,105 @@ Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result();
}

namespace {
struct BatchConverter {
explicit BatchConverter(::arrow::internal::Executor* executor)
: exec_context(std::make_shared<ExecContext>(default_memory_pool(), executor)) {}

~BatchConverter() {
if (!exec_plan) {
return;
}
if (exec_plan->finished().is_finished()) {
return;
}
exec_plan->StopProducing();
Status abandoned_status = exec_plan->finished().status();
if (!abandoned_status.ok()) {
abandoned_status.Warn();
}
}

Future<std::shared_ptr<RecordBatch>> operator()() {
return exec_batch_gen().Then(
[this](const std::optional<ExecBatch>& batch)
-> Future<std::shared_ptr<RecordBatch>> {
if (batch) {
return batch->ToRecordBatch(schema);
} else {
return exec_plan->finished().Then(
[]() -> std::shared_ptr<RecordBatch> { return nullptr; });
}
},
[this](const Status& err) {
return exec_plan->finished().Then(
[err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
});
}

std::shared_ptr<ExecContext> exec_context;
AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
std::shared_ptr<Schema> schema;
std::shared_ptr<ExecPlan> exec_plan;
};

Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
Declaration declaration, ::arrow::internal::Executor* executor,
std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>(executor);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
ExecPlan::Make(converter->exec_context.get()));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
ARROW_RETURN_NOT_OK(plan->StartProducing());
converter->exec_plan = std::move(plan);
*out_schema = converter->schema;
return [conv = std::move(converter)] { return (*conv)(); };
}
} // namespace

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads) {
std::shared_ptr<Schema> schema;
auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
return DeclarationToRecordBatchGenerator(declaration, executor, &schema);
},
use_threads));

struct PlanReader : RecordBatchReader {
PlanReader(std::shared_ptr<Schema> schema,
std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator)
: schema_(std::move(schema)), iterator_(std::move(iterator)) {}

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
DCHECK(!!iterator_) << "call to ReadNext on already closed reader";
return iterator_->Next().Value(record_batch);
}

Status Close() override {
// End plan and read from generator until finished
std::shared_ptr<RecordBatch> batch;
do {
ARROW_RETURN_NOT_OK(ReadNext(&batch));
} while (batch != nullptr);
iterator_.reset();
return Status::OK();
}

std::shared_ptr<Schema> schema_;
std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
};

return std::make_unique<PlanReader>(std::move(schema), std::move(batch_iterator));
}

namespace internal {

void RegisterSourceNode(ExecFactoryRegistry*);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration and return results as a RecordBatchReader
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads);

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,27 @@ Result<Expression> SimplifyWithGuarantee(Expression expr,
return expr;
}

Result<Expression> RemoveNamedRefs(Expression src) {
if (!src.IsBound()) {
return Status::Invalid("RemoveNamedRefs called on unbound expression");
}
return ModifyExpression(
std::move(src),
/*pre=*/
[](Expression expr) {
const Expression::Parameter* param = expr.parameter();
if (param && !param->ref.IsFieldPath()) {
FieldPath ref_as_path(
std::vector<int>(param->indices.begin(), param->indices.end()));
return Expression(
Expression::Parameter{std::move(ref_as_path), param->type, param->indices});
}

return expr;
},
/*post_call=*/[](Expression expr, ...) { return expr; });
}

// Serialization is accomplished by converting expressions to KeyValueMetadata and storing
// this in the schema of a RecordBatch. Embedded arrays and scalars are stored in its
// columns. Finally, the RecordBatch is written to an IPC file.
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/exec/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ ARROW_EXPORT
Result<Expression> SimplifyWithGuarantee(Expression,
const Expression& guaranteed_true_predicate);

/// Replace all named field refs (e.g. "x" or "x.y") with field paths (e.g. [0] or [1,3])
///
/// This isn't usually needed and does not offer any simplification by itself. However,
/// it can be useful to normalize an expression to paths to make it simpler to work with.
ARROW_EXPORT Result<Expression> RemoveNamedRefs(Expression expression);

/// @}

// Execution
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/arrow/compute/exec/expression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,24 @@ TEST(Expression, FoldConstantsBoolean) {
ExpectFoldsTo(or_(whatever, whatever), whatever);
}

void ExpectRemovesRefsTo(Expression expr, Expression expected,
const Schema& schema = *kBoringSchema) {
ASSERT_OK_AND_ASSIGN(expr, expr.Bind(schema));
ASSERT_OK_AND_ASSIGN(expected, expected.Bind(schema));

ASSERT_OK_AND_ASSIGN(auto without_named_refs, RemoveNamedRefs(expr));

EXPECT_EQ(without_named_refs, expected);
}

TEST(Expression, RemoveNamedRefs) {
ExpectRemovesRefsTo(field_ref("i32"), field_ref(2));
ExpectRemovesRefsTo(call("add", {literal(4), field_ref("i32")}),
call("add", {literal(4), field_ref(2)}));
auto nested_schema = Schema({field("a", struct_({field("b", int32())}))});
ExpectRemovesRefsTo(field_ref({"a", "b"}), field_ref({0, 0}), nested_schema);
}

TEST(Expression, ExtractKnownFieldValues) {
struct {
void operator()(Expression guarantee,
Expand Down Expand Up @@ -1364,6 +1382,10 @@ TEST(Expression, SimplifyWithValidityGuarantee) {
.WithGuarantee(is_null(field_ref("i32")))
.Expect(literal(false));

Simplify{{true_unless_null(field_ref("i32"))}}
.WithGuarantee(is_null(field_ref("i32")))
.Expect(null_literal(boolean()));

Simplify{is_valid(field_ref("i32"))}
.WithGuarantee(is_valid(field_ref("i32")))
.Expect(literal(true));
Expand All @@ -1379,6 +1401,21 @@ TEST(Expression, SimplifyWithValidityGuarantee) {
Simplify{true_unless_null(field_ref("i32"))}
.WithGuarantee(is_valid(field_ref("i32")))
.Expect(literal(true));

Simplify{{equal(field_ref("i32"), literal(7))}}
.WithGuarantee(is_null(field_ref("i32")))
.Expect(null_literal(boolean()));

auto i32_is_2_or_null =
or_(equal(field_ref("i32"), literal(2)), is_null(field_ref("i32")));

Simplify{i32_is_2_or_null}
.WithGuarantee(is_null(field_ref("i32")))
.Expect(literal(true));

Simplify{{greater(field_ref("i32"), literal(7))}}
.WithGuarantee(is_null(field_ref("i32")))
.Expect(null_literal(boolean()));
}

TEST(Expression, SimplifyWithComparisonAndNullableCaveat) {
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,19 @@ struct ARROW_EXPORT BackpressureOptions {
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema,
BackpressureOptions backpressure = {},
BackpressureMonitor** backpressure_monitor = NULLPTR)
: generator(generator),
schema(schema),
backpressure(backpressure),
backpressure_monitor(backpressure_monitor) {}

explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
BackpressureOptions backpressure = {},
BackpressureMonitor** backpressure_monitor = NULLPTR)
: generator(generator),
schema(NULLPTR),
backpressure(std::move(backpressure)),
backpressure_monitor(backpressure_monitor) {}

Expand All @@ -226,6 +236,11 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
/// data from the plan. If this function is not called frequently enough then the sink
/// node will start to accumulate data and may apply backpressure.
std::function<Future<std::optional<ExecBatch>>()>* generator;
/// \brief A pointer which will be set to the schema of the generated batches
///
/// This is optional, if nullptr is passed in then it will be ignored.
/// This will be set when the node is added to the plan, before StartProducing is called
std::shared_ptr<Schema>* schema;
/// \brief Options to control when to apply backpressure
///
/// This is optional, the default is to never apply backpressure. If the plan is not
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,14 @@ TEST(ExecPlanExecution, SinkNodeBackpressure) {
BackpressureMonitor* backpressure_monitor;
BackpressureOptions backpressure_options(resume_if_below_bytes, pause_if_above_bytes);
std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
ARROW_EXPECT_OK(compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, backpressure_options,
&backpressure_monitor}},
})
.AddToPlan(plan.get()));
ARROW_EXPECT_OK(
compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
backpressure_options, &backpressure_monitor}},
})
.AddToPlan(plan.get()));
ASSERT_TRUE(backpressure_monitor);
ARROW_EXPECT_OK(plan->StartProducing());

Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SinkNode : public ExecNode {
public:
SinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
AsyncGenerator<std::optional<ExecBatch>>* generator,
BackpressureOptions backpressure,
std::shared_ptr<Schema>* schema, BackpressureOptions backpressure,
BackpressureMonitor** backpressure_monitor_out)
: ExecNode(plan, std::move(inputs), {"collected"}, {},
/*num_outputs=*/0),
Expand All @@ -103,6 +103,9 @@ class SinkNode : public ExecNode {
*backpressure_monitor_out = &backpressure_queue_;
}
auto node_destroyed_capture = node_destroyed_;
if (schema) {
*schema = inputs_[0]->output_schema();
}
*generator = [this, node_destroyed_capture]() -> Future<std::optional<ExecBatch>> {
if (*node_destroyed_capture) {
return Status::Invalid(
Expand All @@ -126,7 +129,7 @@ class SinkNode : public ExecNode {
const auto& sink_options = checked_cast<const SinkNodeOptions&>(options);
RETURN_NOT_OK(ValidateOptions(sink_options));
return plan->EmplaceNode<SinkNode>(plan, std::move(inputs), sink_options.generator,
sink_options.backpressure,
sink_options.schema, sink_options.backpressure,
sink_options.backpressure_monitor);
}

Expand Down Expand Up @@ -414,7 +417,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<std::optional<ExecBatch>>* generator)
: SinkNode(plan, std::move(inputs), generator, /*backpressure=*/{},
: SinkNode(plan, std::move(inputs), generator, /*schema=*/nullptr,
/*backpressure=*/{},
/*backpressure_monitor_out=*/nullptr),
impl_(std::move(impl)) {}

Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ Fragment::Fragment(compute::Expression partition_expression,
: partition_expression_(std::move(partition_expression)),
physical_schema_(std::move(physical_schema)) {}

Future<std::shared_ptr<InspectedFragment>> Fragment::InspectFragment() {
Future<std::shared_ptr<InspectedFragment>> Fragment::InspectFragment(
const FragmentScanOptions* format_options, compute::ExecContext* exec_context) {
return Status::NotImplemented("Inspect fragment");
}

Future<std::shared_ptr<FragmentScanner>> Fragment::BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) {
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options, compute::ExecContext* exec_context) {
return Status::NotImplemented("New scan method");
}

Expand Down Expand Up @@ -154,7 +156,8 @@ Future<std::optional<int64_t>> InMemoryFragment::CountRows(
return Future<std::optional<int64_t>>::MakeFinished(total);
}

Future<std::shared_ptr<InspectedFragment>> InMemoryFragment::InspectFragment() {
Future<std::shared_ptr<InspectedFragment>> InMemoryFragment::InspectFragment(
const FragmentScanOptions* format_options, compute::ExecContext* exec_context) {
return std::make_shared<InspectedFragment>(physical_schema_->field_names());
}

Expand All @@ -180,7 +183,8 @@ class InMemoryFragment::Scanner : public FragmentScanner {
};

Future<std::shared_ptr<FragmentScanner>> InMemoryFragment::BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment) {
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options, compute::ExecContext* exec_context) {
return Future<std::shared_ptr<FragmentScanner>>::MakeFinished(
std::make_shared<InMemoryFragment::Scanner>(this));
}
Expand Down
17 changes: 11 additions & 6 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct ARROW_DS_EXPORT FragmentScanRequest {
/// before returning the scanned batch.
std::vector<FragmentSelectionColumn> columns;
/// \brief Options specific to the format being scanned
FragmentScanOptions* format_scan_options;
const FragmentScanOptions* format_scan_options;
};

/// \brief An iterator-like object that can yield batches created from a fragment
Expand Down Expand Up @@ -156,11 +156,13 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
/// This will be called before a scan and a fragment should attach whatever
/// information will be needed to figure out an evolution strategy. This information
/// will then be passed to the call to BeginScan
virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment();
virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
const FragmentScanOptions* format_options, compute::ExecContext* exec_context);

/// \brief Start a scan operation
virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment);
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options, compute::ExecContext* exec_context);

/// \brief Count the number of rows in this fragment matching the filter using metadata
/// only. That is, this method may perform I/O, but will not load data.
Expand Down Expand Up @@ -228,10 +230,13 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) override;

Future<std::shared_ptr<InspectedFragment>> InspectFragment() override;
Future<std::shared_ptr<InspectedFragment>> InspectFragment(
const FragmentScanOptions* format_options,
compute::ExecContext* exec_context) override;
Future<std::shared_ptr<FragmentScanner>> BeginScan(
const FragmentScanRequest& request,
const InspectedFragment& inspected_fragment) override;
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options,
compute::ExecContext* exec_context) override;

std::string type_name() const override { return "in-memory"; }

Expand Down
Loading