Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
438 changes: 155 additions & 283 deletions cpp/examples/arrow/execution_plan_documentation_examples.cc

Large diffs are not rendered by default.

38 changes: 6 additions & 32 deletions cpp/examples/arrow/join_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,8 @@ arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData
}

arrow::Status DoHashJoin() {
cp::ExecContext exec_context;

arrow::dataset::internal::Initialize();

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));

arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

cp::ExecNode* left_source;
cp::ExecNode* right_source;

ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData(true));
ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData(false));

Expand All @@ -111,10 +101,8 @@ arrow::Status DoHashJoin() {
auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, l_options};
auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, r_options};

ARROW_ASSIGN_OR_RAISE(left_source,
cp::MakeExecNode("scan", plan.get(), {}, l_scan_node_options));
ARROW_ASSIGN_OR_RAISE(right_source,
cp::MakeExecNode("scan", plan.get(), {}, r_scan_node_options));
arrow::compute::Declaration left{"scan", std::move(l_scan_node_options)};
arrow::compute::Declaration right{"scan", std::move(r_scan_node_options)};

arrow::compute::HashJoinNodeOptions join_opts{arrow::compute::JoinType::INNER,
/*in_left_keys=*/{"lkey"},
Expand All @@ -123,26 +111,12 @@ arrow::Status DoHashJoin() {
/*output_suffix_for_left*/ "_l",
/*output_suffix_for_right*/ "_r"};

ARROW_ASSIGN_OR_RAISE(
auto hashjoin,
cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
arrow::compute::Declaration hashjoin{
"hashjoin", {std::move(left), std::move(right)}, join_opts};

ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), {hashjoin},
cp::SinkNodeOptions{&sink_gen}));
// expected columns l_a, l_b
std::shared_ptr<arrow::RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
hashjoin->output_schema(), std::move(sink_gen), exec_context.memory_pool());

// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
// start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());

// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;

ARROW_ASSIGN_OR_RAISE(response_table,
arrow::Table::FromRecordBatchReader(sink_reader.get()));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> response_table,
arrow::compute::DeclarationToTable(std::move(hashjoin)));

std::cout << "Results : " << response_table->ToString() << std::endl;

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"

namespace arrow {
Expand All @@ -56,6 +57,7 @@ using internal::BitmapAnd;
using internal::checked_cast;
using internal::CopyBitmap;
using internal::CpuInfo;
using internal::GetCpuThreadPool;

namespace compute {

Expand All @@ -64,6 +66,11 @@ ExecContext* default_exec_context() {
return &default_ctx;
}

ExecContext* threaded_exec_context() {
static ExecContext threaded_ctx(default_memory_pool(), GetCpuThreadPool());
return &threaded_ctx;
}

ExecBatch::ExecBatch(const RecordBatch& batch)
: values(batch.num_columns()), length(batch.num_rows()) {
auto columns = batch.column_data();
Expand Down
20 changes: 7 additions & 13 deletions cpp/src/arrow/compute/exec/asof_join_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ static void TableJoinOverhead(benchmark::State& state,
TableGenerationProperties right_table_properties,
int batch_size, int num_right_tables,
std::string factory_name, ExecNodeOptions& options) {
ExecContext ctx(default_memory_pool(), nullptr);
left_table_properties.column_prefix = "lt";
left_table_properties.seed = 0;
ASSERT_OK_AND_ASSIGN(TableStats left_table_stats, MakeTable(left_table_properties));
Expand All @@ -75,23 +74,18 @@ static void TableJoinOverhead(benchmark::State& state,

for (auto _ : state) {
state.PauseTiming();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
ExecPlan::Make(&ctx));
std::vector<ExecNode*> input_nodes = {*arrow::compute::MakeExecNode(
"table_source", plan.get(), {},
std::vector<Declaration::Input> input_nodes = {Declaration(
"table_source",
arrow::compute::TableSourceNodeOptions(left_table_stats.table, batch_size))};
input_nodes.reserve(right_input_tables.size() + 1);
for (TableStats table_stats : right_input_tables) {
input_nodes.push_back(*arrow::compute::MakeExecNode(
"table_source", plan.get(), {},
input_nodes.push_back(Declaration(
"table_source",
arrow::compute::TableSourceNodeOptions(table_stats.table, batch_size)));
}
ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * join_node,
MakeExecNode(factory_name, plan.get(), input_nodes, options));
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ASSERT_OK(MakeExecNode("sink", plan.get(), {join_node}, SinkNodeOptions{&sink_gen}));
Declaration join_node{factory_name, {input_nodes}, options};
state.ResumeTiming();
ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
ASSERT_OK(DeclarationToStatus(std::move(join_node)));
}

state.counters["input_rows_per_second"] = benchmark::Counter(
Expand All @@ -104,7 +98,7 @@ static void TableJoinOverhead(benchmark::State& state,
benchmark::Counter::kIsRate);

state.counters["maximum_peak_memory"] =
benchmark::Counter(static_cast<double>(ctx.memory_pool()->max_memory()));
benchmark::Counter(static_cast<double>(default_memory_pool()->max_memory()));
}

static void AsOfJoinOverhead(benchmark::State& state) {
Expand Down
59 changes: 43 additions & 16 deletions cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,36 @@ class AsofJoinNode : public ExecNode {
}
}

void Process() {
template <typename Callable>
struct Defer {
Callable callable;
explicit Defer(Callable callable) : callable(std::move(callable)) {}
~Defer() noexcept { callable(); }
};

void EndFromProcessThread() {
// We must spawn a new task to transfer off the process thread when
// marking this finished. Otherwise there is a chance that doing so could
// mark the plan finished which may destroy the plan which will destroy this
// node which will cause us to join on ourselves.
ErrorIfNotOk(plan_->query_context()->executor()->Spawn([this] {
Defer cleanup([this]() { finished_.MarkFinished(); });
outputs_[0]->InputFinished(this, batches_produced_);
}));
}

bool CheckEnded() {
if (state_.at(0)->Finished()) {
EndFromProcessThread();
return false;
}
return true;
}

bool Process() {
std::lock_guard<std::mutex> guard(gate_);
if (finished_.is_finished()) {
return;
if (!CheckEnded()) {
return false;
}

// Process batches while we have data
Expand All @@ -825,7 +851,8 @@ class AsofJoinNode : public ExecNode {
outputs_[0]->InputReceived(this, std::move(out_b));
} else {
ErrorIfNotOk(result.status());
return;
EndFromProcessThread();
return false;
}
}

Expand All @@ -834,18 +861,24 @@ class AsofJoinNode : public ExecNode {
//
// It may happen here in cases where InputFinished was called before we were finished
// producing results (so we didn't know the output size at that time)
if (state_.at(0)->Finished()) {
outputs_[0]->InputFinished(this, batches_produced_);
finished_.MarkFinished();
if (!CheckEnded()) {
return false;
}

// There is no more we can do now but there is still work remaining for later when
// more data arrives.
return true;
}

void ProcessThread() {
for (;;) {
if (!process_.Pop()) {
EndFromProcessThread();
return;
}
if (!Process()) {
return;
}
Process();
}
}

Expand Down Expand Up @@ -1120,10 +1153,7 @@ class AsofJoinNode : public ExecNode {
// finished.
process_.Push(true);
}
Status StartProducing() override {
finished_ = arrow::Future<>::Make();
return Status::OK();
}
Status StartProducing() override { return Status::OK(); }
void PauseProducing(ExecNode* output, int32_t counter) override {}
void ResumeProducing(ExecNode* output, int32_t counter) override {}
void StopProducing(ExecNode* output) override {
Expand All @@ -1137,7 +1167,6 @@ class AsofJoinNode : public ExecNode {
arrow::Future<> finished() override { return finished_; }

private:
arrow::Future<> finished_;
std::vector<col_index_t> indices_of_on_key_;
std::vector<std::vector<col_index_t>> indices_of_by_key_;
std::vector<std::unique_ptr<KeyHasher>> key_hashers_;
Expand Down Expand Up @@ -1176,9 +1205,7 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector inputs,
may_rehash_(may_rehash),
tolerance_(tolerance),
process_(),
process_thread_(&AsofJoinNode::ProcessThreadWrapper, this) {
finished_ = arrow::Future<>::MakeFinished();
}
process_thread_(&AsofJoinNode::ProcessThreadWrapper, this) {}

namespace internal {
void RegisterAsofJoinNode(ExecFactoryRegistry* registry) {
Expand Down
19 changes: 3 additions & 16 deletions cpp/src/arrow/compute/exec/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
const BatchesWithSchema& r1_batches,
const BatchesWithSchema& exp_batches,
const AsofJoinNodeOptions join_options) {
auto exec_ctx = std::make_unique<ExecContext>(default_memory_pool(), nullptr);
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));

Declaration join{"asofjoin", join_options};

join.inputs.emplace_back(Declaration{
Expand All @@ -235,21 +232,12 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{r1_batches.schema, r1_batches.gen(false, false)}});

AsyncGenerator<std::optional<ExecBatch>> sink_gen;

ASSERT_OK(Declaration::Sequence({join, {"sink", SinkNodeOptions{&sink_gen}}})
.AddToPlan(plan.get()));

ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen));
for (auto batch : res) {
ASSERT_EQ(exp_batches.schema->num_fields(), batch.values.size());
}
ASSERT_OK_AND_ASSIGN(auto res_table,
DeclarationToTable(std::move(join), /*use_threads=*/false));

ASSERT_OK_AND_ASSIGN(auto exp_table,
TableFromExecBatches(exp_batches.schema, exp_batches.batches));

ASSERT_OK_AND_ASSIGN(auto res_table, TableFromExecBatches(exp_batches.schema, res));

AssertTablesEqual(*exp_table, *res_table,
/*same_chunk_layout=*/true, /*flatten=*/true);
}
Expand All @@ -270,8 +258,7 @@ void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
const AsofJoinNodeOptions& join_options,
const std::string& expected_error_str,
bool fail_on_plan_creation = false) {
ExecContext exec_ctx;
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(*threaded_exec_context()));

Declaration join{"asofjoin", join_options};
join.inputs.emplace_back(Declaration{
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/exec/benchmark_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace compute {
// calling InputFinished and InputReceived.

Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
arrow::compute::ExecContext ctx,
arrow::compute::Expression expr, int32_t num_batches,
int32_t batch_size,
arrow::compute::BatchesWithSchema data,
Expand All @@ -46,7 +45,7 @@ Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(&ctx));
arrow::compute::ExecPlan::Make());
// Source and sink nodes have no effect on the benchmark.
// Used for dummy purposes as they are referenced in InputReceived and InputFinished.
ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source_node,
Expand Down Expand Up @@ -113,13 +112,13 @@ Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
// a source -> node_declarations -> sink sequence.

Status BenchmarkNodeOverhead(
benchmark::State& state, arrow::compute::ExecContext ctx, int32_t num_batches,
int32_t batch_size, arrow::compute::BatchesWithSchema data,
benchmark::State& state, int32_t num_batches, int32_t batch_size,
arrow::compute::BatchesWithSchema data,
std::vector<arrow::compute::Declaration>& node_declarations) {
for (auto _ : state) {
state.PauseTiming();
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(&ctx));
arrow::compute::ExecPlan::Make());
AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;
arrow::compute::Declaration source = arrow::compute::Declaration(
{"source",
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/compute/exec/benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ namespace arrow {

namespace compute {

Status BenchmarkNodeOverhead(benchmark::State& state, arrow::compute::ExecContext ctx,
int32_t num_batches, int32_t batch_size,
arrow::compute::BatchesWithSchema data,
Status BenchmarkNodeOverhead(benchmark::State& state, int32_t num_batches,
int32_t batch_size, arrow::compute::BatchesWithSchema data,
std::vector<arrow::compute::Declaration>& node_declarations);

Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
arrow::compute::ExecContext ctx,
arrow::compute::Expression expr, int32_t num_batches,
int32_t batch_size,
arrow::compute::BatchesWithSchema data,
Expand Down
Loading