diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 725372700ca..98f8f92cc37 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -61,7 +61,7 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema, *ss << ']'; } -class ScalarAggregateNode : public ExecNode { +class ScalarAggregateNode : public ExecNode, public TracedNode { public: ScalarAggregateNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, @@ -139,11 +139,6 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } Status DoConsume(const ExecSpan& batch, size_t thread_index) { - util::tracing::Span span; - START_COMPUTE_SPAN(span, "Consume", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, @@ -161,12 +156,7 @@ class ScalarAggregateNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); auto thread_index = plan_->query_context()->GetThreadIndex(); @@ -179,13 +169,11 @@ class ScalarAggregateNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); } void InputFinished(ExecNode* input, int total_batches) override { - EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); DCHECK_EQ(input, inputs_[0]); if (input_counter_.SetTotal(total_batches)) { ErrorIfNotOk(Finish()); @@ -193,10 +181,7 @@ class ScalarAggregateNode : public ExecNode { } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + NoteStartProducing(ToStringExtra()); // Scalar aggregates will only output a single batch outputs_[0]->InputFinished(this, 1); return Status::OK(); @@ -216,7 +201,6 @@ class ScalarAggregateNode : public ExecNode { } void StopProducing() override { - EVENT(span_, "StopProducing"); if (input_counter_.Cancel()) { finished_.MarkFinished(); } @@ -233,9 +217,7 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { - util::tracing::Span span; - START_COMPUTE_SPAN(span, "Finish", - {{"aggregate", ToStringExtra()}, {"node.label", label()}}); + auto scope = TraceFinish(); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); @@ -266,7 +248,7 @@ class ScalarAggregateNode : public ExecNode { AtomicCounter input_counter_; }; -class GroupByNode : public ExecNode { +class GroupByNode : public ExecNode, public TracedNode { public: GroupByNode(ExecNode* input, std::shared_ptr output_schema, std::vector key_field_ids, std::vector agg_src_field_ids, @@ -361,11 +343,6 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } Status Consume(ExecSpan batch) { - util::tracing::Span span; - START_COMPUTE_SPAN(span, "Consume", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); size_t thread_index = plan_->query_context()->GetThreadIndex(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -514,6 +491,7 @@ class GroupByNode : public ExecNode { } void OutputResult() { + auto scope = TraceFinish(); // If something goes wrong outputting the result we need to make sure // we still mark finished. Status st = DoOutputResult(); @@ -523,12 +501,7 @@ class GroupByNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + auto scope = TraceInputReceived(batch); // bail if StopProducing was called if (finished_.is_finished()) return; @@ -543,16 +516,12 @@ class GroupByNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); - DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); } void InputFinished(ExecNode* input, int total_batches) override { - EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); - // bail if StopProducing was called if (finished_.is_finished()) return; @@ -564,10 +533,7 @@ class GroupByNode : public ExecNode { } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + NoteStartProducing(ToStringExtra()); local_states_.resize(plan_->query_context()->max_concurrency()); return Status::OK(); } @@ -583,7 +549,6 @@ class GroupByNode : public ExecNode { } void StopProducing(ExecNode* output) override { - EVENT(span_, "StopProducing"); DCHECK_EQ(output, outputs_[0]); if (input_counter_.Cancel()) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 88cd298d2cb..19081267da7 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -113,12 +113,12 @@ struct ExecPlanImpl : public ExecPlan { // If no source node schedules any tasks (e.g. they do all their word synchronously as // part of StartProducing) then the plan may be finished before we return from this // call. + auto scope = START_SCOPED_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); Future<> scheduler_finished = util::AsyncTaskScheduler::Make( [this](util::AsyncTaskScheduler* async_scheduler) { QueryContext* ctx = query_context(); RETURN_NOT_OK(ctx->Init(ctx->max_concurrency(), async_scheduler)); - START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); @@ -136,7 +136,11 @@ struct ExecPlanImpl : public ExecPlan { RETURN_NOT_OK(n->Init()); } for (auto& n : nodes_) { - async_scheduler->AddSimpleTask([&] { return n->finished(); }); + std::string qualified_label = std::string(n->kind_name()) + ":" + n->label(); + std::string wait_for_finish = + "ExecPlan::WaitForFinish(" + qualified_label + ")"; + async_scheduler->AddSimpleTask([&] { return n->finished(); }, + std::move(wait_for_finish)); } ctx->scheduler()->RegisterEnd(); @@ -149,7 +153,9 @@ struct ExecPlanImpl : public ExecPlan { RETURN_NOT_OK(ctx->scheduler()->StartScheduling( 0 /* thread_index */, [ctx](std::function fn) -> Status { - return ctx->ScheduleTask(std::move(fn)); + // TODO(weston) add names to synchronous scheduler so we can use something + // better than sync-scheduler-task here + return ctx->ScheduleTask(std::move(fn), "sync-scheduler-task"); }, /*concurrent_tasks=*/2 * num_threads, sync_execution)); @@ -163,10 +169,7 @@ struct ExecPlanImpl : public ExecPlan { ++it) { auto node = *it; - EVENT(span_, "StartProducing:" + node->label(), - {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}}); st = node->StartProducing(); - EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}}); if (!st.ok()) { // Stop nodes that successfully started, in reverse order bool expected = false; @@ -197,7 +200,7 @@ struct ExecPlanImpl : public ExecPlan { void StopProducing() { DCHECK(started_) << "stopped an ExecPlan which never started"; - EVENT(span_, "StopProducing"); + EVENT(span_, "ExecPlan::StopProducing"); bool expected = false; if (stopped_.compare_exchange_strong(expected, true)) { query_context()->scheduler()->Abort( @@ -209,8 +212,6 @@ struct ExecPlanImpl : public ExecPlan { void StopProducingImpl(It begin, It end) { for (auto it = begin; it != end; ++it) { auto node = *it; - EVENT(span_, "StopProducing:" + node->label(), - {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}}); node->StopProducing(); } } @@ -459,7 +460,7 @@ std::string ExecNode::ToString(int indent) const { return ss.str(); } -std::string ExecNode::ToStringExtra(int indent = 0) const { return ""; } +std::string ExecNode::ToStringExtra(int indent) const { return ""; } bool ExecNode::ErrorIfNotOk(Status status) { if (status.ok()) return false; diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index a1a89158c54..40b217273f4 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -290,7 +290,7 @@ class ARROW_EXPORT ExecNode { bool ErrorIfNotOk(Status status); /// Provide extra info to include in the string representation. - virtual std::string ToStringExtra(int indent) const; + virtual std::string ToStringExtra(int indent = 0) const; ExecPlan* plan_; std::string label_; @@ -304,8 +304,6 @@ class ARROW_EXPORT ExecNode { // Future to sync finished Future<> finished_ = Future<>::Make(); - - util::tracing::Span span_; }; /// \brief An extensible registry for factories of ExecNodes diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 8274453b6c7..b44b25d29fe 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -103,17 +103,9 @@ class FilterNode : public MapNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"filter", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); auto result = DoFilter(std::move(batch)); - MARK_SPAN(span, result.status()); - END_SPAN(span); return result; }; this->SubmitTask(std::move(func), std::move(batch)); diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 37bdb82517a..998497ede49 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -678,7 +678,7 @@ bool HashJoinSchema::HasLargeBinary() const { return false; } -class HashJoinNode : public ExecNode { +class HashJoinNode : public ExecNode, public TracedNode { public: HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& join_options, std::shared_ptr output_schema, @@ -873,6 +873,7 @@ class HashJoinNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); if (complete_.load()) { return; @@ -881,11 +882,6 @@ class HashJoinNode : public ExecNode { size_t thread_index = plan_->query_context()->GetThreadIndex(); int side = (input == inputs_[0]) ? 0 : 1; - EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"batch.length", batch.length}}); - Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch)) : OnBuildSideBatch(thread_index, std::move(batch)); @@ -908,7 +904,6 @@ class HashJoinNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); StopProducing(); outputs_[0]->ErrorReceived(this, std::move(error)); @@ -919,8 +914,6 @@ class HashJoinNode : public ExecNode { size_t thread_index = plan_->query_context()->GetThreadIndex(); int side = (input == inputs_[0]) ? 0 : 1; - EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", total_batches}}); - if (batch_count_[side].SetTotal(total_batches)) { Status status = side == 0 ? OnProbeSideFinished(thread_index) : OnBuildSideFinished(thread_index); @@ -987,11 +980,7 @@ class HashJoinNode : public ExecNode { } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); RETURN_NOT_OK( pushdown_context_.StartProducing(plan_->query_context()->GetThreadIndex())); return Status::OK(); @@ -1013,7 +1002,6 @@ class HashJoinNode : public ExecNode { } void StopProducing() override { - EVENT(span_, "StopProducing"); bool expected = false; if (complete_.compare_exchange_strong(expected, true)) { impl_->Abort([this]() { finished_.MarkFinished(); }); diff --git a/cpp/src/arrow/compute/exec/map_node.cc b/cpp/src/arrow/compute/exec/map_node.cc index 16201ea1290..a5e73fb6f33 100644 --- a/cpp/src/arrow/compute/exec/map_node.cc +++ b/cpp/src/arrow/compute/exec/map_node.cc @@ -41,13 +41,11 @@ MapNode::MapNode(ExecPlan* plan, std::vector inputs, void MapNode::ErrorReceived(ExecNode* input, Status error) { DCHECK_EQ(input, inputs_[0]); - EVENT(span_, "ErrorReceived", {{"error.message", error.message()}}); outputs_[0]->ErrorReceived(this, std::move(error)); } void MapNode::InputFinished(ExecNode* input, int total_batches) { DCHECK_EQ(input, inputs_[0]); - EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); outputs_[0]->InputFinished(this, total_batches); if (input_counter_.SetTotal(total_batches)) { this->Finish(); @@ -55,9 +53,7 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) { } Status MapNode::StartProducing() { - START_COMPUTE_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); + NoteStartProducing(ToStringExtra()); return Status::OK(); } @@ -75,7 +71,6 @@ void MapNode::StopProducing(ExecNode* output) { } void MapNode::StopProducing() { - EVENT(span_, "StopProducing"); if (input_counter_.Cancel()) { this->Finish(); } @@ -84,6 +79,7 @@ void MapNode::StopProducing() { void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch) { + auto scope = TraceInputReceived(batch); Status status; // This will be true if the node is stopped early due to an error or manual // cancellation diff --git a/cpp/src/arrow/compute/exec/map_node.h b/cpp/src/arrow/compute/exec/map_node.h index 88241ece592..9211afc0060 100644 --- a/cpp/src/arrow/compute/exec/map_node.h +++ b/cpp/src/arrow/compute/exec/map_node.h @@ -42,7 +42,7 @@ namespace arrow { namespace compute { -class ARROW_EXPORT MapNode : public ExecNode { +class ARROW_EXPORT MapNode : public ExecNode, public TracedNode { public: MapNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 5e8c2245a2b..239f2d38de1 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -39,7 +39,7 @@ using internal::checked_cast; namespace compute { namespace { -class ProjectNode : public MapNode { +class ProjectNode : public MapNode, public TracedNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, std::vector exprs) @@ -96,17 +96,9 @@ class ProjectNode : public MapNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"project", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); auto result = DoProject(std::move(batch)); - MARK_SPAN(span, result.status()); - END_SPAN(span); return result; }; this->SubmitTask(std::move(func), std::move(batch)); diff --git a/cpp/src/arrow/compute/exec/query_context.cc b/cpp/src/arrow/compute/exec/query_context.cc index a155c750a2a..f4664f1fe93 100644 --- a/cpp/src/arrow/compute/exec/query_context.cc +++ b/cpp/src/arrow/compute/exec/query_context.cc @@ -51,34 +51,36 @@ Result QueryContext::GetTempStack(size_t thread_index) { return &tld_[thread_index].stack; } -Result> QueryContext::BeginExternalTask() { +Result> QueryContext::BeginExternalTask(std::string_view name) { Future<> completion_future = Future<>::Make(); - if (async_scheduler_->AddSimpleTask( - [completion_future] { return completion_future; })) { + if (async_scheduler_->AddSimpleTask([completion_future] { return completion_future; }, + name)) { return completion_future; } return Future<>{}; } -Status QueryContext::ScheduleTask(std::function fn) { +Status QueryContext::ScheduleTask(std::function fn, std::string_view name) { ::arrow::internal::Executor* exec = executor(); // Adds a task which submits fn to the executor and tracks its progress. If we're // already stopping then the task is ignored and fn is not executed. - async_scheduler_->AddSimpleTask([exec, fn]() { return exec->Submit(std::move(fn)); }); + async_scheduler_->AddSimpleTask([exec, fn]() { return exec->Submit(std::move(fn)); }, + name); return Status::OK(); } -Status QueryContext::ScheduleTask(std::function fn) { +Status QueryContext::ScheduleTask(std::function fn, + std::string_view name) { std::function indexed_fn = [this, fn]() { size_t thread_index = GetThreadIndex(); return fn(thread_index); }; - return ScheduleTask(std::move(indexed_fn)); + return ScheduleTask(std::move(indexed_fn), name); } -Status QueryContext::ScheduleIOTask(std::function fn) { +Status QueryContext::ScheduleIOTask(std::function fn, std::string_view name) { async_scheduler_->AddSimpleTask( - [this, fn]() { return io_context_.executor()->Submit(std::move(fn)); }); + [this, fn]() { return io_context_.executor()->Submit(std::move(fn)); }, name); return Status::OK(); } diff --git a/cpp/src/arrow/compute/exec/query_context.h b/cpp/src/arrow/compute/exec/query_context.h index 12ddbc56fad..8d5379f8f28 100644 --- a/cpp/src/arrow/compute/exec/query_context.h +++ b/cpp/src/arrow/compute/exec/query_context.h @@ -14,6 +14,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once + +#include #include "arrow/compute/exec.h" #include "arrow/compute/exec/task_util.h" @@ -21,8 +24,6 @@ #include "arrow/io/interfaces.h" #include "arrow/util/async_util.h" -#pragma once - namespace arrow { using io::IOContext; @@ -70,26 +71,31 @@ class ARROW_EXPORT QueryContext { /// a valid future is returned then it should be marked complete when the /// external task has finished. /// + /// \param name A name to give the task for traceability and debugging + /// /// \return an invalid future if the plan has already ended, otherwise this /// returns a future that must be completed when the external task /// finishes. - Result> BeginExternalTask(); + Result> BeginExternalTask(std::string_view name); /// \brief Add a single function as a task to the query's task group /// on the compute threadpool. /// /// \param fn The task to run. Takes no arguments and returns a Status. - Status ScheduleTask(std::function fn); + /// \param name A name to give the task for traceability and debugging + Status ScheduleTask(std::function fn, std::string_view name); /// \brief Add a single function as a task to the query's task group /// on the compute threadpool. /// /// \param fn The task to run. Takes the thread index and returns a Status. - Status ScheduleTask(std::function fn); + /// \param name A name to give the task for traceability and debugging + Status ScheduleTask(std::function fn, std::string_view name); /// \brief Add a single function as a task to the query's task group on /// the IO thread pool /// /// \param fn The task to run. Returns a status. - Status ScheduleIOTask(std::function fn); + /// \param name A name to give the task for traceability and debugging + Status ScheduleIOTask(std::function fn, std::string_view name); // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: // - The task function takes the thread index and the index of the task diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 2ecce751135..62c226885e5 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" @@ -41,6 +42,8 @@ #include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" +using namespace std::string_view_literals; // NOLINT + namespace arrow { using internal::checked_cast; @@ -94,7 +97,7 @@ class BackpressureReservoir : public BackpressureMonitor { const uint64_t pause_if_above_; }; -class SinkNode : public ExecNode { +class SinkNode : public ExecNode, public TracedNode { public: SinkNode(ExecPlan* plan, std::vector inputs, AsyncGenerator>* generator, @@ -143,11 +146,7 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); return Status::OK(); } @@ -164,8 +163,6 @@ class SinkNode : public ExecNode { [[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); } void StopProducing() override { - EVENT(span_, "StopProducing"); - Finish(); inputs_[0]->StopProducing(this); } @@ -175,7 +172,9 @@ class SinkNode : public ExecNode { uint64_t bytes_used = static_cast(batch.TotalBufferSize()); auto state_change = backpressure_queue_.RecordProduced(bytes_used); if (state_change >= 0) { - EVENT(span_, "Backpressure applied", {{"backpressure.counter", state_change}}); + EVENT_ON_CURRENT_SPAN( + "SinkNode::BackpressureApplied", + {{"node.label", label()}, {"backpressure.counter", state_change}}); inputs_[0]->PauseProducing(this, state_change); } } @@ -186,19 +185,16 @@ class SinkNode : public ExecNode { uint64_t bytes_freed = static_cast(batch.TotalBufferSize()); auto state_change = backpressure_queue_.RecordConsumed(bytes_freed); if (state_change >= 0) { - EVENT(span_, "Backpressure released", {{"backpressure.counter", state_change}}); + EVENT_ON_CURRENT_SPAN( + "SinkNode::BackpressureReleased", + {{"node.label", label()}, {"backpressure.counter", state_change}}); inputs_[0]->ResumeProducing(this, state_change); } } } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); - + auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); RecordBackpressureBytesUsed(batch); @@ -211,7 +207,6 @@ class SinkNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); producer_.Push(std::move(error)); @@ -223,7 +218,6 @@ class SinkNode : public ExecNode { } void InputFinished(ExecNode* input, int total_batches) override { - EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); if (input_counter_.SetTotal(total_batches)) { Finish(); } @@ -267,7 +261,9 @@ class SinkNode : public ExecNode { // is finished. Use SinkNode if you are transferring the ownership of the data to another // system. Use ConsumingSinkNode if the data is being consumed within the exec plan (i.e. // the exec plan should not complete until the consumption has completed). -class ConsumingSinkNode : public ExecNode, public BackpressureControl { +class ConsumingSinkNode : public ExecNode, + public BackpressureControl, + public TracedNode { public: ConsumingSinkNode(ExecPlan* plan, std::vector inputs, std::shared_ptr consumer, @@ -294,11 +290,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); DCHECK_GT(inputs_.size(), 0); auto output_schema = inputs_[0]->output_schema(); if (names_.size() > 0) { @@ -335,19 +327,13 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { void Resume() override { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } void StopProducing() override { - EVENT(span_, "StopProducing"); if (input_counter_.Cancel()) { Finish(Status::OK()); } } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); - + auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); // This can happen if an error was received and the source hasn't yet stopped. Since @@ -371,7 +357,6 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); if (input_counter_.Cancel()) Finish(error); @@ -380,7 +365,6 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { } void InputFinished(ExecNode* input, int total_batches) override { - EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); if (input_counter_.SetTotal(total_batches)) { Finish(Status::OK()); } @@ -390,7 +374,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { void Finish(const Status& finish_st) { if (finish_st.ok()) { plan_->query_context()->async_scheduler()->AddSimpleTask( - [this] { return consumer_->Finish(); }); + [this] { return consumer_->Finish(); }, "ConsumingSinkNode::Finish"sv); } finished_.MarkFinished(finish_st); } @@ -482,11 +466,7 @@ struct OrderBySinkNode final : public SinkNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); @@ -509,6 +489,7 @@ struct OrderBySinkNode final : public SinkNode { protected: Status DoFinish() { + auto scope = TraceFinish(); ARROW_ASSIGN_OR_RAISE(Datum sorted, impl_->DoFinish()); TableBatchReader reader(*sorted.table()); while (true) { @@ -522,8 +503,6 @@ struct OrderBySinkNode final : public SinkNode { } void Finish() override { - util::tracing::Span span; - START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 76c222f5b76..af43b63d0cf 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -40,6 +40,8 @@ #include "arrow/util/unreachable.h" #include "arrow/util/vector.h" +using namespace std::string_view_literals; // NOLINT + namespace arrow { using internal::checked_cast; @@ -48,7 +50,7 @@ using internal::MapVector; namespace compute { namespace { -struct SourceNode : ExecNode { +struct SourceNode : ExecNode, public TracedNode { SourceNode(ExecPlan* plan, std::shared_ptr output_schema, AsyncGenerator> generator) : ExecNode(plan, {}, {}, std::move(output_schema), @@ -73,12 +75,7 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this @@ -101,8 +98,8 @@ struct SourceNode : ExecNode { options.executor = executor; options.should_schedule = ShouldSchedule::IfDifferentExecutor; } - ARROW_ASSIGN_OR_RAISE(Future<> scan_task, - plan_->query_context()->BeginExternalTask()); + ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->query_context()->BeginExternalTask( + "SourceNode::DatasetScan")); if (!scan_task.is_valid()) { finished_.MarkFinished(); // Plan has already been aborted, no need to start scanning @@ -114,10 +111,14 @@ struct SourceNode : ExecNode { return Future>::MakeFinished(Break(batch_count_)); } lock.unlock(); - + util::tracing::Span fetch_batch_span; + auto fetch_batch_scope = + START_SCOPED_SPAN(fetch_batch_span, "SourceNode::ReadBatch"); return generator_().Then( - [this](const std::optional& maybe_morsel) - -> Future> { + [this, fetch_batch_span = std::move(fetch_batch_span)]( + const std::optional& maybe_morsel) mutable + -> Future> { + fetch_batch_span.reset(); std::unique_lock lock(mutex_); if (IsIterationEnd(maybe_morsel) || stop_requested_) { return Break(batch_count_); @@ -153,10 +154,11 @@ struct SourceNode : ExecNode { outputs_[0]->InputReceived(this, std::move(batch)); } while (offset < morsel.length); return Status::OK(); - })); + }, + "SourceNode::ProcessMorsel")); lock.lock(); if (!backpressure_future_.is_finished()) { - EVENT(span_, "Source paused due to backpressure"); + EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied"); return backpressure_future_.Then( []() -> ControlFlow { return Continue(); }); } diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc index afff52beaf0..dca1e59821b 100644 --- a/cpp/src/arrow/compute/exec/tpch_node.cc +++ b/cpp/src/arrow/compute/exec/tpch_node.cc @@ -3426,17 +3426,19 @@ class TpchNode : public ExecNode { Status ScheduleTaskCallback(std::function func) { if (finished_generating_.load()) return Status::OK(); num_running_++; - return plan_->query_context()->ScheduleTask([this, func](size_t thread_index) { - Status status = func(thread_index); - if (!status.ok()) { - StopProducing(); - ErrorIfNotOk(status); - } - if (--num_running_ == 0) { - finished_.MarkFinished(Status::OK()); - } - return status; - }); + return plan_->query_context()->ScheduleTask( + [this, func](size_t thread_index) { + Status status = func(thread_index); + if (!status.ok()) { + StopProducing(); + ErrorIfNotOk(status); + } + if (--num_running_ == 0) { + finished_.MarkFinished(Status::OK()); + } + return status; + }, + "TpchNode::GenerateAndProcessBatch"); } const char* name_; diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 096188f4799..f9241a431e3 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -46,7 +46,7 @@ std::vector GetInputLabels(const ExecNode::NodeVector& inputs) { } } // namespace -class UnionNode : public ExecNode { +class UnionNode : public ExecNode, public TracedNode { public: UnionNode(ExecPlan* plan, std::vector inputs) : ExecNode(plan, inputs, GetInputLabels(inputs), @@ -77,7 +77,7 @@ class UnionNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + NoteInputReceived(batch); ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); if (finished_.is_finished()) { @@ -90,7 +90,6 @@ class UnionNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { - EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); @@ -98,8 +97,6 @@ class UnionNode : public ExecNode { } void InputFinished(ExecNode* input, int total_batches) override { - EVENT(span_, "InputFinished", - {{"input", input_count_.count()}, {"batches.length", total_batches}}); ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); total_batches_.fetch_add(total_batches); @@ -113,11 +110,7 @@ class UnionNode : public ExecNode { } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); return Status::OK(); } @@ -134,7 +127,6 @@ class UnionNode : public ExecNode { } void StopProducing(ExecNode* output) override { - EVENT(span_, "StopProducing"); DCHECK_EQ(output, outputs_[0]); if (batch_count_.Cancel()) { finished_.MarkFinished(); diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index ea0c8cf36f1..93ed197d121 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -36,6 +36,7 @@ #include "arrow/util/logging.h" #include "arrow/util/mutex.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #if defined(__clang__) || defined(__GNUC__) #define BYTESWAP(x) __builtin_bswap64(x) @@ -424,5 +425,75 @@ Result ModifyExpression(Expression expr, const PreVisit& pre, return post_call(std::move(expr), NULLPTR); } +/// CRTP helper for tracing helper functions + +template +class TracedNode { + public: + // All nodes should call TraceStartProducing or NoteStartProducing exactly once + // Most nodes will be fine with a call to NoteStartProducing since the StartProducing + // call is usually fairly cheap and simply schedules tasks to fetch the actual data. + + // Create a span to record the StartProducing work + [[nodiscard]] ::arrow::internal::tracing::Scope TraceStartProducing( + std::string extra_details) const { + const T& self = cast(); + std::string node_kind(self.kind_name()); + util::tracing::Span span; + return START_SCOPED_SPAN( + span, node_kind + "::StartProducing", + {{"node.details", extra_details}, {"node.label", self.label()}}); + } + + // Record a call to StartProducing without creating with a span + void NoteStartProducing(std::string extra_details) const { + const T& self = cast(); + std::string node_kind(self.kind_name()); + EVENT_ON_CURRENT_SPAN( + node_kind + "::StartProducing", + {{"node.details", extra_details}, {"node.label", self.label()}}); + } + + // All nodes should call TraceInputReceived for each batch they receive. This call + // should track the time spent processing the batch. NoteInputReceived is available + // but usually won't be used unless a node is simply adding batches to a trivial queue. + + // Create a span to record the InputReceived work + [[nodiscard]] ::arrow::internal::tracing::Scope TraceInputReceived( + const ExecBatch& batch) const { + const T& self = cast(); + std::string node_kind(self.kind_name()); + util::tracing::Span span; + return START_SCOPED_SPAN( + span, node_kind + "::InputReceived", + {{"node.label", self.label()}, {"node.batch_length", batch.length}}); + } + + // Record a call to InputReceived without creating with a span + void NoteInputReceived(const ExecBatch& batch) const { + const T& self = cast(); + std::string node_kind(self.kind_name()); + EVENT_ON_CURRENT_SPAN( + node_kind + "::InputReceived", + {{"node.label", self.label()}, {"node.batch_length", batch.length}}); + } + + // Create a span to record any "finish" work. This should NOT be called as part of + // InputFinished and many nodes may not need to call this at all. This should be used + // when a node has some extra work that has to be done once it has received all of its + // data. For example, an aggregation node calculating aggregations. This will + // typically be called as a result of InputFinished OR InputReceived. + [[nodiscard]] ::arrow::internal::tracing::Scope TraceFinish() const { + const T& self = cast(); + std::string node_kind(self.kind_name()); + util::tracing::Span span; + return START_SCOPED_SPAN(span, node_kind + "::Finish", + {{"node.label", self.label()}}); + } + + private: + const T& cast() const { return static_cast(*this); } +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 504d029bda3..b1ebb660d6e 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -30,6 +30,9 @@ #include "arrow/util/logging.h" #include "arrow/util/map.h" #include "arrow/util/string.h" +#include "arrow/util/tracing_internal.h" + +using namespace std::string_view_literals; // NOLINT namespace arrow { @@ -138,13 +141,15 @@ class DatasetWriterFileQueue { file_tasks_ = std::move(file_tasks); // Because the scheduler runs one task at a time we know the writer will // be opened before any attempt to write - file_tasks_->AddSimpleTask([this, filename] { - Executor* io_executor = options_.filesystem->io_context().executor(); - return DeferNotOk(io_executor->Submit([this, filename]() { - ARROW_ASSIGN_OR_RAISE(writer_, OpenWriter(options_, schema_, filename)); - return Status::OK(); - })); - }); + file_tasks_->AddSimpleTask( + [this, filename] { + Executor* io_executor = options_.filesystem->io_context().executor(); + return DeferNotOk(io_executor->Submit([this, filename]() { + ARROW_ASSIGN_OR_RAISE(writer_, OpenWriter(options_, schema_, filename)); + return Status::OK(); + })); + }, + "DatasetWriter::OpenWriter"sv); } Result> PopStagedBatch() { @@ -177,9 +182,11 @@ class DatasetWriterFileQueue { } void ScheduleBatch(std::shared_ptr batch) { - file_tasks_->AddSimpleTask([self = this, batch = std::move(batch)]() { - return self->WriteNext(std::move(batch)); - }); + file_tasks_->AddSimpleTask( + [self = this, batch = std::move(batch)]() { + return self->WriteNext(std::move(batch)); + }, + "DatasetWriter::WriteBatch"sv); } Result PopAndDeliverStagedBatch() { @@ -214,7 +221,8 @@ class DatasetWriterFileQueue { // At this point all write tasks have been added. Because the scheduler // is a 1-task FIFO we know this task will run at the very end and can // add it now. - file_tasks_->AddSimpleTask([this] { return DoFinish(); }); + file_tasks_->AddSimpleTask([this] { return DoFinish(); }, + "DatasetWriter::FinishFile"sv); return Status::OK(); } @@ -332,7 +340,8 @@ class DatasetWriterDirectoryQueue { scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task)); if (init_future_.is_valid()) { latest_open_file_tasks_->AddSimpleTask( - [init_future = init_future_]() { return init_future; }); + [init_future = init_future_]() { return init_future; }, + "DatasetWriter::WaitForDirectoryInit"sv); } latest_open_file_->Start(latest_open_file_tasks_.get(), filename); return Status::OK(); @@ -373,7 +382,8 @@ class DatasetWriterDirectoryQueue { return create_dir_cb().Then(notify_waiters_cb, notify_waiters_on_err_cb); }; } - scheduler_->AddSimpleTask(std::move(init_task)); + scheduler_->AddSimpleTask(std::move(init_task), + "DatasetWriter::InitializeDirectory"sv); } static Result> Make( @@ -519,30 +529,34 @@ class DatasetWriter::DatasetWriterImpl { void WriteRecordBatch(std::shared_ptr batch, const std::string& directory, const std::string& prefix) { - write_tasks_->AddSimpleTask([this, batch = std::move(batch), directory, - prefix]() mutable { - Future<> has_room = WriteAndCheckBackpressure(std::move(batch), directory, prefix); - if (!has_room.is_finished()) { - // We don't have to worry about sequencing backpressure here since - // task_group_ serves as our sequencer. If batches continue to arrive after - // we pause they will queue up in task_group_ until we free up and call - // Resume - pause_callback_(); - return has_room.Then([this] { resume_callback_(); }); - } - return has_room; - }); + write_tasks_->AddSimpleTask( + [this, batch = std::move(batch), directory, prefix]() mutable { + Future<> has_room = + WriteAndCheckBackpressure(std::move(batch), directory, prefix); + if (!has_room.is_finished()) { + // We don't have to worry about sequencing backpressure here since + // task_group_ serves as our sequencer. If batches continue to arrive after + // we pause they will queue up in task_group_ until we free up and call + // Resume + pause_callback_(); + return has_room.Then([this] { resume_callback_(); }); + } + return has_room; + }, + "DatasetWriter::WriteAndCheckBackpressure"sv); } void Finish() { - write_tasks_->AddSimpleTask([this]() -> Result> { - for (const auto& directory_queue : directory_queues_) { - ARROW_RETURN_NOT_OK(directory_queue.second->Finish()); - } - // This task is purely synchronous but we add it to write_tasks_ for the throttling - // task group benefits. - return Future<>::MakeFinished(); - }); + write_tasks_->AddSimpleTask( + [this]() -> Result> { + for (const auto& directory_queue : directory_queues_) { + ARROW_RETURN_NOT_OK(directory_queue.second->Finish()); + } + // This task is purely synchronous but we add it to write_tasks_ for the + // throttling task group benefits. + return Future<>::MakeFinished(); + }, + "DatasetWriter::FinishAll"sv); write_tasks_.reset(); } @@ -583,11 +597,13 @@ class DatasetWriter::DatasetWriterImpl { backpressure = writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); break; } if (will_open_file) { backpressure = writer_state_.open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); RETURN_NOT_OK(CloseLargestFile()); break; } diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index 247c4cdc0e2..d7d058558f3 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -33,6 +33,8 @@ #include "arrow/testing/gtest_util.h" #include "gtest/gtest.h" +using namespace std::string_view_literals; // NOLINT + namespace arrow { namespace dataset { namespace internal { @@ -84,7 +86,9 @@ class DatasetWriterTestFixture : public testing::Test { scheduler_finished_ = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* scheduler) { scheduler_ = scheduler; - scheduler->AddSimpleTask([&] { return test_done_with_tasks_; }); + scheduler->AddSimpleTask( + [&] { return test_done_with_tasks_; }, + "DatasetWriterTestFixture::WaitForTestMethodToFinish"sv); return Status::OK(); }); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 6eb23fcbc94..4c74cf1f62a 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -31,6 +31,7 @@ #include "arrow/compute/exec/map_node.h" #include "arrow/compute/exec/query_context.h" #include "arrow/compute/exec/subtree_internal.h" +#include "arrow/compute/exec/util.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/dataset_writer.h" #include "arrow/dataset/scanner.h" @@ -556,19 +557,8 @@ class TeeNode : public compute::MapNode { } void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override { - EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); - auto func = [this](compute::ExecBatch batch) { - util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"tee", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); - auto result = DoTee(std::move(batch)); - MARK_SPAN(span, result.status()); - END_SPAN(span); - return result; - }; + auto func = [this](compute::ExecBatch batch) { return DoTee(std::move(batch)); }; this->SubmitTask(std::move(func), std::move(batch)); } diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc index 07b2b9886c4..d37d3316b3c 100644 --- a/cpp/src/arrow/dataset/scan_node.cc +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -33,9 +33,12 @@ #include "arrow/type.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" +#include "arrow/util/string.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" +using namespace std::string_view_literals; // NOLINT + namespace cp = arrow::compute; namespace arrow { @@ -111,7 +114,7 @@ Future>> GetFragments(Dataset* dataset, /// fragments. On destruction we continue consuming the fragments until they complete /// (which should be fairly quick since we cancelled the fragment). This ensures the /// I/O work is completely finished before the node is destroyed. -class ScanNode : public cp::ExecNode { +class ScanNode : public cp::ExecNode, public cp::TracedNode { public: ScanNode(cp::ExecPlan* plan, ScanV2Options options, std::shared_ptr output_schema) @@ -202,6 +205,7 @@ class ScanNode : public cp::ExecNode { // case. cost_ = static_cast( std::min(cost, static_cast(std::numeric_limits::max()))); + name_ = "ScanNode::ScanBatch::" + ::arrow::internal::ToChars(batch_index_); } Result> operator()() override { @@ -213,6 +217,8 @@ class ScanNode : public cp::ExecNode { }); } + std::string_view name() const override { return name_; } + Status HandleBatch(const std::shared_ptr& batch) { ARROW_ASSIGN_OR_RAISE( compute::ExecBatch evolved_batch, @@ -222,7 +228,8 @@ class ScanNode : public cp::ExecNode { [node = node_, evolved_batch = std::move(evolved_batch)] { node->outputs_[0]->InputReceived(node, std::move(evolved_batch)); return Status::OK(); - }); + }, + "ScanNode::ProcessMorsel"); } int cost() const override { return cost_; } @@ -231,11 +238,14 @@ class ScanNode : public cp::ExecNode { ScanState* scan_; int batch_index_; int cost_; + std::string name_; }; struct ListFragmentTask : util::AsyncTaskScheduler::Task { ListFragmentTask(ScanNode* node, std::shared_ptr fragment) - : node(node), fragment(std::move(fragment)) {} + : node(node), fragment(std::move(fragment)) { + name_ = "ScanNode::ListFragment::" + this->fragment->ToString(); + } Result> operator()() override { return fragment @@ -246,6 +256,8 @@ class ScanNode : public cp::ExecNode { }); } + std::string_view name() const override { return name_; } + Future<> BeginScan(const std::shared_ptr& inspected_fragment) { // Now that we have an inspected fragment we need to use the dataset's evolution // strategy to figure out how to scan it @@ -311,6 +323,7 @@ class ScanNode : public cp::ExecNode { ScanNode* node; std::shared_ptr fragment; std::unique_ptr scan_state = std::make_unique(); + std::string name_; }; void ScanFragments(const AsyncGenerator>& frag_gen) { @@ -323,28 +336,27 @@ class ScanNode : public cp::ExecNode { return Status::OK(); }); fragment_tasks->AddAsyncGenerator>( - std::move(frag_gen), [this, fragment_tasks = std::move(fragment_tasks)]( - const std::shared_ptr& fragment) { + std::move(frag_gen), + [this, fragment_tasks = + std::move(fragment_tasks)](const std::shared_ptr& fragment) { fragment_tasks->AddTask(std::make_unique(this, fragment)); return Status::OK(); - }); + }, + "ScanNode::ListDataset::Next"); } Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); + NoteStartProducing(ToStringExtra()); batches_throttle_ = util::ThrottledAsyncTaskScheduler::Make( plan_->query_context()->async_scheduler(), options_.target_bytes_readahead + 1); - plan_->query_context()->async_scheduler()->AddSimpleTask([this] { - return GetFragments(options_.dataset.get(), options_.filter) - .Then([this](const AsyncGenerator>& frag_gen) { - ScanFragments(frag_gen); - }); - }); + plan_->query_context()->async_scheduler()->AddSimpleTask( + [this] { + return GetFragments(options_.dataset.get(), options_.filter) + .Then([this](const AsyncGenerator>& frag_gen) { + ScanFragments(frag_gen); + }); + }, + "ScanNode::ListDataset::GetFragments"sv); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0587863eb3c..f307787357d 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -286,16 +286,14 @@ Result FragmentToBatches( const Enumerated>& fragment, const std::shared_ptr& options) { #ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = arrow::internal::tracing::GetTracer(); - auto span = tracer->StartSpan( - "arrow::dataset::FragmentToBatches", - { - {"arrow.dataset.fragment", fragment.value->ToString()}, - {"arrow.dataset.fragment.index", fragment.index}, - {"arrow.dataset.fragment.last", fragment.last}, - {"arrow.dataset.fragment.type_name", fragment.value->type_name()}, - }); - auto scope = tracer->WithActiveSpan(span); + util::tracing::Span span; + START_SPAN(span, "Scanner::FragmentToBatches", + { + {"arrow.dataset.fragment", fragment.value->ToString()}, + {"arrow.dataset.fragment.index", fragment.index}, + {"arrow.dataset.fragment.last", fragment.last}, + {"arrow.dataset.fragment.type_name", fragment.value->type_name()}, + }); #endif ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options)); ArrayVector columns; diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index ebf264de060..0a59a462c95 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -19,12 +19,16 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/string.h" +#include "arrow/util/tracing_internal.h" #include #include #include #include +using namespace std::string_view_literals; // NOLINT + namespace arrow { namespace util { @@ -118,6 +122,28 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { std::list> tasks_; }; +#ifdef ARROW_WITH_OPENTELEMETRY +::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* task, + const util::tracing::Span& parent) { + if (task->span.valid()) { + EVENT(task->span, "task submitted"); + return ACTIVATE_SPAN(task->span); + } + + return START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), + {{"task.cost", task->cost()}}); +} + +void TraceTaskQueued(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { + START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), + {{"task.cost", task->cost()}}); +} + +void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } + +void TraceSchedulerAbort(const Status& error) { EVENT_ON_CURRENT_SPAN(error.ToString()); } +#endif + class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { public: using Task = AsyncTaskScheduler::Task; @@ -126,7 +152,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { FnOnce abort_callback) : AsyncTaskScheduler(), stop_token_(std::move(stop_token)), - abort_callback_(std::move(abort_callback)) {} + abort_callback_(std::move(abort_callback)) { + START_SCOPED_SPAN(span_, "AsyncTaskScheduler"); + } ~AsyncTaskSchedulerImpl() { DCHECK_EQ(running_tasks_, 0) << " scheduler destroyed while tasks still running"; @@ -145,6 +173,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { } Future<> OnFinished() const { return finished_; } + const tracing::Span& span() const override { return span_; } private: bool IsAborted() { return !maybe_error_.ok(); } @@ -173,6 +202,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // Capture `task` to keep it alive until finished if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { +#ifdef ARROW_WITH_OPENTELEMETRY + TraceTaskFinished(task_inner2.get()); +#endif OnTaskFinished(st); }; })) { @@ -195,6 +227,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { bool aborted = false; if (!IsAborted()) { maybe_error_ = st; +#ifdef ARROW_WITH_OPENTELEMETRY + TraceSchedulerAbort(st); +#endif // Add one more "task" to represent running the abort callback. This // will prevent any other task finishing and marking the scheduler finished // while we are running the abort callback. @@ -215,6 +250,12 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(stop_token_.Poll(), std::move(lk)); return; } +#ifdef ARROW_WITH_OPENTELEMETRY + // It's important that the task's span be active while we run the submit function. + // Normally the submit function should transfer the span to the thread task as the + // active span. + auto scope = TraceTaskSubmitted(task.get(), span_); +#endif running_tasks_++; lk.unlock(); return DoSubmitTask(std::move(task)); @@ -229,8 +270,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { std::mutex mutex_; StopToken stop_token_; FnOnce abort_callback_; - bool abort_callback_pending_ = false; - std::condition_variable abort_callback_cv_; + util::tracing::Span span_; // Allows AsyncTaskScheduler::Make to call OnTaskFinished friend AsyncTaskScheduler; @@ -265,6 +305,9 @@ class ThrottledAsyncTaskSchedulerImpl int latched_cost = std::min(task->cost(), throttle_->Capacity()); std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { +#ifdef ARROW_WITH_OPENTELEMETRY + TraceTaskQueued(task.get(), span()); +#endif queue_->Push(std::move(task)); lk.unlock(); maybe_backoff->AddCallback( @@ -285,11 +328,13 @@ class ThrottledAsyncTaskSchedulerImpl void Pause() override { throttle_->Pause(); } void Resume() override { throttle_->Resume(); } + const util::tracing::Span& span() const override { return target_->span(); } private: bool SubmitTask(std::unique_ptr task, int latched_cost) { // Wrap the task with a wrapper that runs it and then checks to see if there are any // queued tasks + std::string_view name = task->name(); return target_->AddSimpleTask( [latched_cost, inner_task = std::move(task), self = shared_from_this()]() mutable -> Result> { @@ -298,7 +343,8 @@ class ThrottledAsyncTaskSchedulerImpl self->throttle_->Release(latched_cost); self->ContinueTasks(); }); - }); + }, + name); } void ContinueTasks() { @@ -350,7 +396,8 @@ class AsyncTaskGroupImpl : public AsyncTaskGroup { if (!st.ok()) { // We can't return an invalid status from the destructor so we schedule a dummy // failing task - target_->AddSimpleTask([st = std::move(st)]() { return st; }); + target_->AddSimpleTask([st = std::move(st)]() { return st; }, + "failed_task_reporter"sv); } } } @@ -370,12 +417,15 @@ class AsyncTaskGroupImpl : public AsyncTaskGroup { }); } int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } std::unique_ptr target; std::shared_ptr state; }; return target_->AddTask(std::make_unique(std::move(task), state_)); } + const util::tracing::Span& span() const override { return target_->span(); } + private: struct State { explicit State(FnOnce finish_cb) @@ -392,6 +442,8 @@ class AsyncTaskGroupImpl : public AsyncTaskGroup { Future<> AsyncTaskScheduler::Make(FnOnce initial_task, FnOnce abort_callback, StopToken stop_token) { + util::tracing::Span span; + auto scope = START_SCOPED_SPAN_SV(span, "AsyncTaskScheduler::InitialTask"sv); auto scheduler = std::make_unique(std::move(stop_token), std::move(abort_callback)); Status initial_task_st = std::move(initial_task)(scheduler.get()); @@ -431,6 +483,7 @@ class ThrottledAsyncTaskGroup : public ThrottledAsyncTaskScheduler { : throttle_(std::move(throttle)), task_group_(std::move(task_group)) {} void Pause() override { throttle_->Pause(); } void Resume() override { throttle_->Resume(); } + const util::tracing::Span& span() const override { return task_group_->span(); } bool AddTask(std::unique_ptr task) override { return task_group_->AddTask(std::move(task)); } diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index de7aa4addd7..2668ae22260 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -30,6 +30,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing.h" namespace arrow { @@ -93,6 +94,14 @@ class ARROW_EXPORT AsyncTaskScheduler { /// tasks based on the total expected RAM usage of the tasks (this is done in the /// scanner) virtual int cost() const { return 1; } + /// The name of the task + /// + /// This is used for debugging and traceability. The returned view must remain + /// valid for the lifetime of the task. + virtual std::string_view name() const = 0; + + /// a span tied to the lifetime of the task, for internal use only + tracing::Span span; }; /// Add a task to the scheduler @@ -106,6 +115,10 @@ class ARROW_EXPORT AsyncTaskScheduler { /// /// \param task the task to submit /// + /// A task's name must remain valid for the duration of the task. It is used for + /// debugging (e.g. when debugging a deadlock to see which tasks still remain) and for + /// traceability (the name will be used for spans asigned to the task) + /// /// \return true if the task was submitted or queued, false if the task was ignored virtual bool AddTask(std::unique_ptr task) = 0; @@ -124,23 +137,48 @@ class ARROW_EXPORT AsyncTaskScheduler { /// /// \param generator the generator to submit to the scheduler /// \param visitor a function which visits each generator future as it completes + /// \param name a name which will be used for each submitted task template bool AddAsyncGenerator(std::function()> generator, - std::function visitor); + std::function visitor, std::string_view name); template struct SimpleTask : public Task { - explicit SimpleTask(Callable callable) : callable(std::move(callable)) {} + SimpleTask(Callable callable, std::string_view name) + : callable(std::move(callable)), name_(name) {} + SimpleTask(Callable callable, std::string name) + : callable(std::move(callable)), owned_name_(std::move(name)) { + name_ = *owned_name_; + } Result> operator()() override { return callable(); } + std::string_view name() const override { return name_; } Callable callable; + std::string_view name_; + std::optional owned_name_; }; /// Add a task with cost 1 to the scheduler /// - /// \see AddTask for details + /// \param callable a "submit" function that should return a future + /// \param name a name for the task + /// + /// `name` must remain valid until the task has been submitted AND the returned + /// future completes. It is used for debugging and tracing. + /// + /// \see AddTask for more details + template + bool AddSimpleTask(Callable callable, std::string_view name) { + return AddTask(std::make_unique>(std::move(callable), name)); + } + + /// Add a task with cost 1 to the scheduler + /// + /// This is an overload of \see AddSimpleTask that keeps `name` alive + /// in the task. template - bool AddSimpleTask(Callable callable) { - return AddTask(std::make_unique>(std::move(callable))); + bool AddSimpleTask(Callable callable, std::string name) { + return AddTask( + std::make_unique>(std::move(callable), std::move(name))); } /// Construct a scheduler @@ -163,6 +201,9 @@ class ARROW_EXPORT AsyncTaskScheduler { FnOnce initial_task, FnOnce abort_callback = [](const Status&) {}, StopToken stop_token = StopToken::Unstoppable()); + + /// A span tracking execution of the scheduler's tasks, for internal use only + virtual const tracing::Span& span() const = 0; }; class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler { @@ -335,16 +376,18 @@ ARROW_EXPORT std::unique_ptr MakeThrottledAsyncTask // AsyncTaskGroup template bool AsyncTaskScheduler::AddAsyncGenerator(std::function()> generator, - std::function visitor) { + std::function visitor, + std::string_view name) { struct State { State(std::function()> generator, std::function visitor, - std::unique_ptr task_group) + std::unique_ptr task_group, std::string_view name) : generator(std::move(generator)), visitor(std::move(visitor)), task_group(std::move(task_group)) {} std::function()> generator; std::function visitor; std::unique_ptr task_group; + std::string_view name; }; struct SubmitTask : public Task { explicit SubmitTask(std::unique_ptr state_holder) @@ -395,13 +438,16 @@ bool AsyncTaskScheduler::AddAsyncGenerator(std::function()> generator, ARROW_RETURN_NOT_OK(state_holder->visitor(item)); } } + + std::string_view name() const { return state_holder->name; } + std::unique_ptr state_holder; }; std::unique_ptr task_group = AsyncTaskGroup::Make(this, [] { return Status::OK(); }); AsyncTaskGroup* task_group_view = task_group.get(); std::unique_ptr state_holder = std::make_unique( - std::move(generator), std::move(visitor), std::move(task_group)); + std::move(generator), std::move(visitor), std::move(task_group), name); task_group_view->AddTask(std::make_unique(std::move(state_holder))); return true; } diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 4fb17e4ea07..119ca7aa42c 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -39,6 +39,8 @@ namespace arrow { namespace util { +constexpr std::string_view kDummyName = "unit test"; + TEST(AsyncTaskScheduler, ShouldScheduleConcurrentTasks) { // A basic test to make sure we schedule the right number of concurrent tasks constexpr int kMaxConcurrentTasks = 2; @@ -51,10 +53,12 @@ TEST(AsyncTaskScheduler, ShouldScheduleConcurrentTasks) { for (int i = 0; i < kTotalNumTasks; i++) { futures[i] = Future<>::Make(); submitted[i] = false; - throttled->AddSimpleTask([&, i] { - submitted[i] = true; - return futures[i]; - }); + throttled->AddSimpleTask( + [&, i] { + submitted[i] = true; + return futures[i]; + }, + kDummyName); } return Status::OK(); }); @@ -81,7 +85,7 @@ TEST(AsyncTaskScheduler, CancelWaitsForTasksToFinish) { Future<> task = Future<>::Make(); Future<> finished = AsyncTaskScheduler::Make( [&](AsyncTaskScheduler* scheduler) { - scheduler->AddSimpleTask([&] { return task; }); + scheduler->AddSimpleTask([&] { return task; }, kDummyName); return Status::OK(); }, /*abort_callback=*/[](const Status&) {}, stop_source.token()); @@ -101,11 +105,13 @@ TEST(AsyncTaskScheduler, CancelPurgesQueuedTasks) { [&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, 1); - throttled->AddSimpleTask([&] { return task; }); - throttled->AddSimpleTask([&] { - second_task_submitted = true; - return Future<>::MakeFinished(); - }); + throttled->AddSimpleTask([&] { return task; }, kDummyName); + throttled->AddSimpleTask( + [&] { + second_task_submitted = true; + return Future<>::MakeFinished(); + }, + kDummyName); return Status::OK(); }, /*abort_callback=*/[](const Status&) {}, stop_source.token()); @@ -121,12 +127,14 @@ TEST(AsyncTaskScheduler, CancelPreventsAdditionalTasks) { bool second_task_submitted = false; Future<> finished = AsyncTaskScheduler::Make( [&](AsyncTaskScheduler* scheduler) { - scheduler->AddSimpleTask([&] { return task; }); + scheduler->AddSimpleTask([&] { return task; }, kDummyName); stop_source.RequestStop(); - scheduler->AddSimpleTask([&] { - second_task_submitted = true; - return task; - }); + scheduler->AddSimpleTask( + [&] { + second_task_submitted = true; + return task; + }, + kDummyName); return Status::OK(); }, /*abort_callback=*/[](const Status&) {}, stop_source.token()); @@ -141,8 +149,8 @@ TEST(AsyncTaskScheduler, AbortCallback) { Future<> task = Future<>::Make(); Future<> finished = AsyncTaskScheduler::Make( [&](AsyncTaskScheduler* scheduler) { - scheduler->AddSimpleTask([&] { return task; }); - scheduler->AddSimpleTask([] { return Status::Invalid("XYZ"); }); + scheduler->AddSimpleTask([&] { return task; }, kDummyName); + scheduler->AddSimpleTask([] { return Status::Invalid("XYZ"); }, kDummyName); return Status::OK(); }, [&](const Status& st) { @@ -160,7 +168,8 @@ TEST(AsyncTaskScheduler, TaskStaysAliveUntilFinished) { MyTask(bool* my_task_destroyed_ptr, Future<> task_fut) : my_task_destroyed_ptr(my_task_destroyed_ptr), task_fut(std::move(task_fut)) {} ~MyTask() { *my_task_destroyed_ptr = true; } - Result> operator()() { return task_fut; } + Result> operator()() override { return task_fut; } + std::string_view name() const override { return kDummyName; } bool* my_task_destroyed_ptr; Future<> task_fut; }; @@ -183,7 +192,7 @@ TEST(AsyncTaskScheduler, InitialTaskAddsNothing) { TEST(AsyncTaskScheduler, InitialTaskFails) { Future<> task = Future<>::Make(); Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { - EXPECT_TRUE(scheduler->AddSimpleTask([&]() { return task; })); + EXPECT_TRUE(scheduler->AddSimpleTask([&]() { return task; }, kDummyName)); return Status::Invalid("XYZ"); }); AssertNotFinished(finished); @@ -203,7 +212,7 @@ TEST(AsyncTaskScheduler, TaskGroup) { finish_callback_ran = true; return Status::OK(); }); - EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; })); + EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; }, kDummyName)); return Status::OK(); }); ASSERT_FALSE(finish_callback_ran); @@ -221,7 +230,7 @@ TEST(AsyncTaskScheduler, TaskGroupLifetime) { finish_callback_ran = true; return Status::OK(); }); - EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; })); + EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; }, kDummyName)); // Last task in group is finished but we still have a reference to the group (and // could still add tasks) so the finish callback does not run task.MarkFinished(); @@ -252,7 +261,7 @@ TEST(AsyncTaskScheduler, TaskGroupFinishCallbackFails) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::unique_ptr task_group = AsyncTaskGroup::Make(scheduler, [&] { return Status::Invalid("XYZ"); }); - EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; })); + EXPECT_TRUE(task_group->AddSimpleTask([&]() { return task; }, kDummyName)); // Last task in group is finished but we still have a reference to the group (and // could still add tasks) so the finish callback does not run return Status::OK(); @@ -268,9 +277,10 @@ TEST(AsyncTaskScheduler, FailingTaskStress) { constexpr int kNumTasks = 256; for (int i = 0; i < kNumTasks; i++) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { - EXPECT_TRUE(scheduler->AddSimpleTask([] { return SleepABitAsync(); })); + EXPECT_TRUE(scheduler->AddSimpleTask([] { return SleepABitAsync(); }, kDummyName)); EXPECT_TRUE(scheduler->AddSimpleTask( - [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); })); + [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); }, + kDummyName)); return Status::OK(); }); ASSERT_FINISHES_AND_RAISES(Invalid, finished); @@ -279,9 +289,10 @@ TEST(AsyncTaskScheduler, FailingTaskStress) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::unique_ptr task_group = AsyncTaskGroup::Make(scheduler, [] { return Status::OK(); }); - EXPECT_TRUE(task_group->AddSimpleTask([] { return SleepABitAsync(); })); + EXPECT_TRUE(task_group->AddSimpleTask([] { return SleepABitAsync(); }, kDummyName)); EXPECT_TRUE(task_group->AddSimpleTask( - [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); })); + [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); }, + kDummyName)); return Status::OK(); }); ASSERT_FINISHES_AND_RAISES(Invalid, finished); @@ -303,7 +314,7 @@ TEST(AsyncTaskScheduler, AsyncGenerator) { seen_values.push_back(val); return Status::OK(); }; - scheduler->AddAsyncGenerator(std::move(generator), std::move(visitor)); + scheduler->AddAsyncGenerator(std::move(generator), std::move(visitor), kDummyName); return Status::OK(); }); ASSERT_FINISHES_OK(finished); @@ -337,11 +348,13 @@ TEST(AsyncTaskScheduler, Throttle) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, 1); - EXPECT_TRUE(throttled->AddSimpleTask([slow_task] { return slow_task; })); - EXPECT_TRUE(throttled->AddSimpleTask([&was_run] { - was_run = true; - return Future<>::MakeFinished(); - })); + EXPECT_TRUE(throttled->AddSimpleTask([slow_task] { return slow_task; }, kDummyName)); + EXPECT_TRUE(throttled->AddSimpleTask( + [&was_run] { + was_run = true; + return Future<>::MakeFinished(); + }, + kDummyName)); EXPECT_FALSE(was_run); return Status::OK(); }); @@ -357,10 +370,12 @@ TEST(AsyncTaskScheduler, Throttle) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::MakeWithCustomThrottle(scheduler, std::move(custom_throttle)); - EXPECT_TRUE(throttled->AddSimpleTask([&was_run] { - was_run = true; - return Future<>::MakeFinished(); - })); + EXPECT_TRUE(throttled->AddSimpleTask( + [&was_run] { + was_run = true; + return Future<>::MakeFinished(); + }, + kDummyName)); EXPECT_FALSE(was_run); custom_throttle_view->Unlock(); return Status::OK(); @@ -387,13 +402,14 @@ TEST(AsyncTaskScheduler, TaskWithCostBiggerThanThrottle) { return task; } int cost() const override { return kThrottleCapacity * 50; } + std::string_view name() const override { return kDummyName; } bool* task_submitted; Future<> task; }; Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, kThrottleCapacity); - EXPECT_TRUE(throttled->AddSimpleTask([&] { return blocking_task; })); + EXPECT_TRUE(throttled->AddSimpleTask([&] { return blocking_task; }, kDummyName)); EXPECT_TRUE( throttled->AddTask(std::make_unique(&task_submitted, task))); return Status::OK(); @@ -413,9 +429,9 @@ TEST(AsyncTaskScheduler, TaskFinishesAfterError) { /// If a task fails it shouldn't impact previously submitted tasks Future<> fut1 = Future<>::Make(); Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { - EXPECT_TRUE(scheduler->AddSimpleTask([fut1] { return fut1; })); + EXPECT_TRUE(scheduler->AddSimpleTask([fut1] { return fut1; }, kDummyName)); EXPECT_TRUE(scheduler->AddSimpleTask( - [] { return Future<>::MakeFinished(Status::Invalid("XYZ")); })); + [] { return Future<>::MakeFinished(Status::Invalid("XYZ")); }, kDummyName)); return Status::OK(); }); AssertNotFinished(finished); @@ -429,11 +445,11 @@ TEST(AsyncTaskScheduler, FailAfterAdd) { Future<> will_fail = Future<>::Make(); Future<> added_later_and_passes = Future<>::Make(); Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { - EXPECT_TRUE(scheduler->AddSimpleTask([will_fail] { return will_fail; })); + EXPECT_TRUE(scheduler->AddSimpleTask([will_fail] { return will_fail; }, kDummyName)); EXPECT_TRUE(scheduler->AddSimpleTask( - [added_later_and_passes] { return added_later_and_passes; })); + [added_later_and_passes] { return added_later_and_passes; }, kDummyName)); will_fail.MarkFinished(Status::Invalid("XYZ")); - EXPECT_FALSE(scheduler->AddSimpleTask([] { return Future<>::Make(); })); + EXPECT_FALSE(scheduler->AddSimpleTask([] { return Future<>::Make(); }, kDummyName)); return Status::OK(); }); AssertNotFinished(finished); @@ -448,11 +464,13 @@ TEST(AsyncTaskScheduler, PurgeUnsubmitted) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, 1); - EXPECT_TRUE(throttled->AddSimpleTask([will_fail] { return will_fail; })); - EXPECT_TRUE(throttled->AddSimpleTask([&was_submitted] { - was_submitted = true; - return Future<>::MakeFinished(); - })); + EXPECT_TRUE(throttled->AddSimpleTask([will_fail] { return will_fail; }, kDummyName)); + EXPECT_TRUE(throttled->AddSimpleTask( + [&was_submitted] { + was_submitted = true; + return Future<>::MakeFinished(); + }, + kDummyName)); will_fail.MarkFinished(Status::Invalid("XYZ")); return Status::OK(); }); @@ -466,13 +484,15 @@ TEST(AsyncTaskScheduler, PurgeUnsubmitted) { finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, 2); - EXPECT_TRUE(throttled->AddSimpleTask([will_fail] { return will_fail; })); + EXPECT_TRUE(throttled->AddSimpleTask([will_fail] { return will_fail; }, kDummyName)); + EXPECT_TRUE(throttled->AddSimpleTask( + [slow_task_that_passes] { return slow_task_that_passes; }, kDummyName)); EXPECT_TRUE(throttled->AddSimpleTask( - [slow_task_that_passes] { return slow_task_that_passes; })); - EXPECT_TRUE(throttled->AddSimpleTask([&was_submitted] { - was_submitted = true; - return Future<>::MakeFinished(); - })); + [&was_submitted] { + was_submitted = true; + return Future<>::MakeFinished(); + }, + kDummyName)); return Status::OK(); }); will_fail.MarkFinished(Status::Invalid("XYZ")); @@ -492,16 +512,20 @@ TEST(AsyncTaskScheduler, FifoStress) { Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, 1); - throttled->AddSimpleTask([] { return SleepABitAsync(); }); - throttled->AddSimpleTask([&] { - middle_task_run = true; - return Future<>::MakeFinished(); - }); + throttled->AddSimpleTask([] { return SleepABitAsync(); }, kDummyName); + throttled->AddSimpleTask( + [&] { + middle_task_run = true; + return Future<>::MakeFinished(); + }, + kDummyName); SleepABit(); - throttled->AddSimpleTask([&] { - EXPECT_TRUE(middle_task_run); - return Future<>::MakeFinished(); - }); + throttled->AddSimpleTask( + [&] { + EXPECT_TRUE(middle_task_run); + return Future<>::MakeFinished(); + }, + kDummyName); return Status::OK(); }); ASSERT_FINISHES_OK(finished); @@ -518,14 +542,16 @@ TEST(AsyncTaskScheduler, MaxConcurrentTasksStress) { std::shared_ptr throttled = ThrottledAsyncTaskScheduler::Make(scheduler, kNumConcurrentTasks); for (int task_idx = 0; task_idx < kNumTasks; task_idx++) { - throttled->AddSimpleTask([&num_tasks_running, kNumConcurrentTasks] { - if (num_tasks_running.fetch_add(1) > kNumConcurrentTasks) { - ADD_FAILURE() << "More than " << kNumConcurrentTasks - << " tasks were allowed to run concurrently"; - } - return SleepABitAsync().Then( - [&num_tasks_running] { num_tasks_running.fetch_sub(1); }); - }); + throttled->AddSimpleTask( + [&num_tasks_running, kNumConcurrentTasks] { + if (num_tasks_running.fetch_add(1) > kNumConcurrentTasks) { + ADD_FAILURE() << "More than " << kNumConcurrentTasks + << " tasks were allowed to run concurrently"; + } + return SleepABitAsync().Then( + [&num_tasks_running] { num_tasks_running.fetch_sub(1); }); + }, + kDummyName); } return Status::OK(); }); @@ -555,13 +581,13 @@ TEST(AsyncTaskScheduler, ScanningStress) { std::unique_ptr task_group = AsyncTaskGroup::Make(throttled.get(), [] { return Status::OK(); }); for (int i = 0; i < kBatchesPerFragment; i++) { - EXPECT_TRUE(task_group->AddSimpleTask(submit_scan)); + EXPECT_TRUE(task_group->AddSimpleTask(submit_scan, kDummyName)); } return Status::OK(); }; auto submit_list_fragment = [&]() { return SleepABitAsync().Then(list_fragment); }; for (int frag_idx = 0; frag_idx < kNumFragments; frag_idx++) { - EXPECT_TRUE(scheduler->AddSimpleTask(submit_list_fragment)); + EXPECT_TRUE(scheduler->AddSimpleTask(submit_list_fragment, kDummyName)); } return Status::OK(); }); @@ -576,6 +602,7 @@ class TaskWithPriority : public AsyncTaskScheduler::Task { TaskWithPriority(std::function>()> task, int priority) : task(std::move(task)), priority(priority) {} Result> operator()() override { return task(); } + std::string_view name() const override { return kDummyName; } std::function>()> task; int priority; diff --git a/cpp/src/arrow/util/tracing.cc b/cpp/src/arrow/util/tracing.cc index 391af3d72de..f4c18f1236e 100644 --- a/cpp/src/arrow/util/tracing.cc +++ b/cpp/src/arrow/util/tracing.cc @@ -33,11 +33,20 @@ Span::Span() noexcept { details = std::make_unique<::arrow::internal::tracing::SpanImpl>(); } +bool Span::valid() const { + return static_cast<::arrow::internal::tracing::SpanImpl*>(details.get())->valid(); +} + +void Span::reset() { details.reset(); } + #else Span::Span() noexcept { /* details is left a nullptr */ } +bool Span::valid() const { return false; } +void Span::reset() {} + #endif } // namespace tracing diff --git a/cpp/src/arrow/util/tracing.h b/cpp/src/arrow/util/tracing.h index c6968219b6e..d7808256418 100644 --- a/cpp/src/arrow/util/tracing.h +++ b/cpp/src/arrow/util/tracing.h @@ -33,6 +33,10 @@ class ARROW_EXPORT SpanDetails { class ARROW_EXPORT Span { public: Span() noexcept; + /// True if this span has been started with START_SPAN + bool valid() const; + /// End the span early + void reset(); std::unique_ptr details; }; diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 7b97ebf7adb..a031edf08dc 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -34,6 +34,7 @@ #endif #endif +#include "arrow/memory_pool.h" #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/tracing.h" @@ -108,9 +109,19 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) class SpanImpl : public ::arrow::util::tracing::SpanDetails { public: ~SpanImpl() override = default; + bool valid() const { return ot_span != nullptr; } opentelemetry::nostd::shared_ptr ot_span; }; +struct Scope { + Scope() + : scope_impl(opentelemetry::trace::Scope( + opentelemetry::nostd::shared_ptr(nullptr))) {} + explicit Scope(opentelemetry::trace::Scope ot_scope) + : scope_impl(std::move(ot_scope)) {} + opentelemetry::trace::Scope scope_impl; +}; + opentelemetry::nostd::shared_ptr& UnwrapSpan( ::arrow::util::tracing::SpanDetails* span); @@ -131,15 +142,32 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( target_span.details.get(), \ ::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__))) -#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ - auto opentelemetry_scope##__LINE__ = \ - ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ - ::arrow::internal::tracing::RewrapSpan( \ - target_span.details.get(), \ - \ - ::arrow::internal::tracing::GetTracer()->StartSpan( \ - __VA_ARGS__, \ - ::arrow::internal::tracing::SpanOptionsWithParent(parent_span)))) +#define START_SCOPED_SPAN(target_span, ...) \ + ::arrow::internal::tracing::Scope( \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + ::arrow::internal::tracing::RewrapSpan( \ + target_span.details.get(), \ + ::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__)))) + +#define START_SCOPED_SPAN_SV(target_span, name, ...) \ + ::arrow::internal::tracing::Scope( \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + ::arrow::internal::tracing::RewrapSpan( \ + target_span.details.get(), \ + ::arrow::internal::tracing::GetTracer()->StartSpan( \ + ::opentelemetry::nostd::string_view(name.data(), name.size()), \ + ##__VA_ARGS__)))) + +#define START_SCOPED_SPAN_WITH_PARENT_SV(target_span, parent_span, name, ...) \ + ::arrow::internal::tracing::Scope( \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + ::arrow::internal::tracing::RewrapSpan( \ + target_span.details.get(), \ + \ + ::arrow::internal::tracing::GetTracer()->StartSpan( \ + ::opentelemetry::nostd::string_view(name.data(), name.size()), \ + __VA_ARGS__, \ + ::arrow::internal::tracing::SpanOptionsWithParent(parent_span))))) #define START_COMPUTE_SPAN(target_span, ...) \ START_SPAN(target_span, __VA_ARGS__); \ @@ -147,15 +175,17 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( ->SetAttribute("arrow.memory_pool_bytes", \ ::arrow::default_memory_pool()->bytes_allocated()) -#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \ - START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \ - ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()) \ - ->SetAttribute("arrow.memory_pool_bytes", \ - ::arrow::default_memory_pool()->bytes_allocated()) +#define EVENT_ON_CURRENT_SPAN(...) \ + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()->AddEvent(__VA_ARGS__) #define EVENT(target_span, ...) \ ::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->AddEvent(__VA_ARGS__) +#define ACTIVATE_SPAN(target_span) \ + ::arrow::internal::tracing::Scope( \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()))) + #define MARK_SPAN(target_span, status) \ ::arrow::internal::tracing::MarkSpan( \ status, ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()).get()) @@ -178,10 +208,10 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) /* - * Calls to the helper macros above are removed by the preprocessor when building - * without opentelemetry, because of the empty definitions below. - * Without them, every call to a helper function would need to be surrounded with - * #ifdef ARROW_WITH_OPENTELEMETRY + * Calls to the helper macros above are removed by the preprocessor when + * building without opentelemetry, because of the empty definitions + * below. Without them, every call to a helper function would need to be + * surrounded with #ifdef ARROW_WITH_OPENTELEMETRY * ... * #endif */ @@ -189,13 +219,18 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #else // !ARROW_WITH_OPENTELEMETRY class SpanImpl {}; +struct Scope { + [[maybe_unused]] ~Scope() {} +}; #define START_SPAN(target_span, ...) -#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) +#define START_SCOPED_SPAN(target_span, ...) ::arrow::internal::tracing::Scope() +#define START_SCOPED_SPAN_SV(target_span, name, ...) ::arrow::internal::tracing::Scope() #define START_COMPUTE_SPAN(target_span, ...) -#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) +#define ACTIVATE_SPAN(target_span) ::arrow::internal::tracing::Scope() #define MARK_SPAN(target_span, status) #define EVENT(target_span, ...) +#define EVENT_ON_CURRENT_SPAN(...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future) #define PROPAGATE_SPAN_TO_GENERATOR(generator) diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 2611944cf26..a132b53e3f3 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -63,8 +63,8 @@ ARROW_MIMALLOC_BUILD_VERSION=v2.0.6 ARROW_MIMALLOC_BUILD_SHA256_CHECKSUM=9f05c94cc2b017ed13698834ac2a3567b6339a8bde27640df5a1581d49d05ce5 ARROW_NLOHMANN_JSON_BUILD_VERSION=v3.10.5 ARROW_NLOHMANN_JSON_BUILD_SHA256_CHECKSUM=5daca6ca216495edf89d167f808d1d03c4a4d929cef7da5e10f135ae1540c7e4 -ARROW_OPENTELEMETRY_BUILD_VERSION=v1.4.1 -ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=301b1ab74a664723560f46c29f228360aff1e2d63e930b963755ea077ae67524 +ARROW_OPENTELEMETRY_BUILD_VERSION=v1.8.1 +ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=3d640201594b07f08dade9cd1017bd0b59674daca26223b560b9bb6bf56264c2 ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.17.0 ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=f269fbcb30e17b03caa1decd231ce826e59d7651c0f71c3b28eb5140b4bb5412 ARROW_ORC_BUILD_VERSION=1.8.1