Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 56 additions & 53 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ class ScalarAggregateNode : public ExecNode {

Status DoConsume(const ExecBatch& batch, size_t thread_index) {
util::tracing::Span span;
START_SPAN(span, "Consume",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN(span, "Consume",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
for (size_t i = 0; i < kernels_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
START_COMPUTE_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());

Expand All @@ -191,10 +191,10 @@ class ScalarAggregateNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
DCHECK_EQ(input, inputs_[0]);

auto thread_index = get_thread_index_();
Expand All @@ -221,10 +221,10 @@ class ScalarAggregateNode : public ExecNode {
}

Status StartProducing() override {
START_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
finished_ = Future<>::Make();
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);
// Scalar aggregates will only output a single batch
Expand Down Expand Up @@ -262,17 +262,18 @@ class ScalarAggregateNode : public ExecNode {
private:
Status Finish() {
util::tracing::Span span;
START_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}});
START_COMPUTE_SPAN(span, "Finish",
{{"aggregate", ToStringExtra()}, {"node.label", label()}});
ExecBatch batch{{}, 1};
batch.values.resize(kernels_.size());

for (size_t i = 0; i < kernels_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
START_COMPUTE_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext ctx{plan()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
Expand Down Expand Up @@ -392,10 +393,10 @@ class GroupByNode : public ExecNode {

Status Consume(ExecBatch batch) {
util::tracing::Span span;
START_SPAN(span, "Consume",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN(span, "Consume",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
size_t thread_index = get_thread_index_();
if (thread_index >= local_states_.size()) {
return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
Expand All @@ -418,11 +419,11 @@ class GroupByNode : public ExecNode {
// Execute aggregate kernels
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
START_COMPUTE_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext kernel_ctx{ctx_};
kernel_ctx.SetState(state->agg_states[i].get());

Expand All @@ -439,7 +440,8 @@ class GroupByNode : public ExecNode {

Status Merge() {
util::tracing::Span span;
START_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}});
START_COMPUTE_SPAN(span, "Merge",
{{"group_by", ToStringExtra()}, {"node.label", label()}});
ThreadLocalState* state0 = &local_states_[0];
for (size_t i = 1; i < local_states_.size(); ++i) {
ThreadLocalState* state = &local_states_[i];
Expand All @@ -453,11 +455,12 @@ class GroupByNode : public ExecNode {

for (size_t i = 0; i < agg_kernels_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
START_COMPUTE_SPAN(
span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
KernelContext batch_ctx{ctx_};
DCHECK(state0->agg_states[i]);
batch_ctx.SetState(state0->agg_states[i].get());
Expand All @@ -473,8 +476,8 @@ class GroupByNode : public ExecNode {

Result<ExecBatch> Finalize() {
util::tracing::Span span;
START_SPAN(span, "Finalize",
{{"group_by", ToStringExtra()}, {"node.label", label()}});
START_COMPUTE_SPAN(span, "Finalize",
{{"group_by", ToStringExtra()}, {"node.label", label()}});

ThreadLocalState* state = &local_states_[0];
// If we never got any batches, then state won't have been initialized
Expand All @@ -486,11 +489,11 @@ class GroupByNode : public ExecNode {
// Aggregate fields come before key fields to match the behavior of GroupBy function
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
START_COMPUTE_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext batch_ctx{ctx_};
batch_ctx.SetState(state->agg_states[i].get());
RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i]));
Expand Down Expand Up @@ -548,10 +551,10 @@ class GroupByNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});

// bail if StopProducing was called
if (finished_.is_finished()) return;
Expand Down Expand Up @@ -587,10 +590,10 @@ class GroupByNode : public ExecNode {
}

Status StartProducing() override {
START_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
finished_ = Future<>::Make();
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct ExecPlanImpl : public ExecPlan {
}

Status StartProducing() {
START_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
auto pairs = metadata().get()->sorted_pairs();
Expand Down Expand Up @@ -387,7 +387,7 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) {
}

Status MapNode::StartProducing() {
START_SPAN(
START_COMPUTE_SPAN(
span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}});
finished_ = Future<>::Make();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/compute/exec/filter_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class FilterNode : public MapNode {
SimplifyWithGuarantee(filter_, target.guarantee));

util::tracing::Span span;
START_SPAN(span, "Filter",
{{"filter.expression", ToStringExtra()},
{"filter.expression.simplified", simplified_filter.ToString()},
{"filter.length", target.length}});
START_COMPUTE_SPAN(span, "Filter",
{{"filter.expression", ToStringExtra()},
{"filter.expression.simplified", simplified_filter.ToString()},
{"filter.length", target.length}});

ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target,
plan()->exec_context()));
Expand Down Expand Up @@ -103,10 +103,10 @@ class FilterNode : public MapNode {
DCHECK_EQ(input, inputs_[0]);
auto func = [this](ExecBatch batch) {
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"filter", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"filter", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto result = DoFilter(std::move(batch));
MARK_SPAN(span, result.status());
END_SPAN(span);
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/exec/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ class HashJoinBasicImpl : public HashJoinImpl {
TaskScheduler::ScheduleImpl schedule_task_callback) override {
num_threads = std::max(num_threads, static_cast<size_t>(1));

START_SPAN(span_, "HashJoinBasicImpl",
{{"detail", filter.ToString()},
{"join.kind", ToString(join_type)},
{"join.threads", static_cast<uint32_t>(num_threads)}});
START_COMPUTE_SPAN(span_, "HashJoinBasicImpl",
{{"detail", filter.ToString()},
{"join.kind", ToString(join_type)},
{"join.threads", static_cast<uint32_t>(num_threads)}});

ctx_ = ctx;
join_type_ = join_type;
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ class HashJoinNode : public ExecNode {

EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"batch.length", batch.length}});

{
Status status = impl_->InputReceived(thread_index, side, std::move(batch));
Expand Down Expand Up @@ -572,10 +572,10 @@ class HashJoinNode : public ExecNode {
}

Status StartProducing() override {
START_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);

bool use_sync_execution = !(plan_->exec_context()->executor());
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/compute/exec/project_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class ProjectNode : public MapNode {
std::vector<Datum> values{exprs_.size()};
for (size_t i = 0; i < exprs_.size(); ++i) {
util::tracing::Span span;
START_SPAN(span, "Project",
{{"project.descr", exprs_[i].descr().ToString()},
{"project.length", target.length},
{"project.expression", exprs_[i].ToString()}});
START_COMPUTE_SPAN(span, "Project",
{{"project.descr", exprs_[i].descr().ToString()},
{"project.length", target.length},
{"project.expression", exprs_[i].ToString()}});
ARROW_ASSIGN_OR_RAISE(Expression simplified_expr,
SimplifyWithGuarantee(exprs_[i], target.guarantee));

Expand All @@ -98,10 +98,10 @@ class ProjectNode : public MapNode {
DCHECK_EQ(input, inputs_[0]);
auto func = [this](ExecBatch batch) {
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"project", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"project", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto result = DoProject(std::move(batch));
MARK_SPAN(span, result.status());
END_SPAN(span);
Expand Down
33 changes: 18 additions & 15 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class SinkNode : public ExecNode {
const char* kind_name() const override { return "SinkNode"; }

Status StartProducing() override {
START_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
finished_ = Future<>::Make();
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);

Expand All @@ -106,8 +106,9 @@ class SinkNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(
span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});

DCHECK_EQ(input, inputs_[0]);

Expand Down Expand Up @@ -174,10 +175,10 @@ class ConsumingSinkNode : public ExecNode {
const char* kind_name() const override { return "ConsumingSinkNode"; }

Status StartProducing() override {
START_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
DCHECK_GT(inputs_.size(), 0);
RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema()));
finished_ = Future<>::Make();
Expand All @@ -204,8 +205,9 @@ class ConsumingSinkNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(
span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});

DCHECK_EQ(input, inputs_[0]);

Expand Down Expand Up @@ -354,8 +356,9 @@ struct OrderBySinkNode final : public SinkNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});
START_COMPUTE_SPAN_WITH_PARENT(
span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});

DCHECK_EQ(input, inputs_[0]);

Expand Down Expand Up @@ -392,7 +395,7 @@ struct OrderBySinkNode final : public SinkNode {

void Finish() override {
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}});
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}});
Status st = DoFinish();
if (ErrorIfNotOk(st)) {
producer_.Push(std::move(st));
Expand Down
Loading