diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5736c557bd0..1181546b2e1 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -220,6 +220,7 @@ set(ARROW_SRCS util/tdigest.cc util/thread_pool.cc util/time.cc + util/tracing.cc util/trie.cc util/unreachable.cc util/uri.cc @@ -389,6 +390,7 @@ if(ARROW_COMPUTE) compute/exec/key_encode.cc compute/exec/key_hash.cc compute/exec/key_map.cc + compute/exec/options.cc compute/exec/order_by_impl.cc compute/exec/project_node.cc compute/exec/sink_node.cc diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 59b2ff8b8af..8e8e0ceebb0 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -32,6 +32,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -165,7 +166,18 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } Status DoConsume(const ExecBatch& batch, size_t thread_index) { + util::tracing::Span span; + START_SPAN(span, "Consume", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); @@ -176,6 +188,12 @@ class ScalarAggregateNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto thread_index = get_thread_index_(); @@ -188,28 +206,34 @@ class ScalarAggregateNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); } void InputFinished(ExecNode* input, int total_batches) override { + EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); DCHECK_EQ(input, inputs_[0]); - if (input_counter_.SetTotal(total_batches)) { ErrorIfNotOk(Finish()); } } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); // Scalar aggregates will only output a single batch outputs_[0]->InputFinished(this, 1); return Status::OK(); } - void PauseProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); } - void ResumeProducing(ExecNode* output) override {} + void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); } void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); @@ -217,6 +241,7 @@ class ScalarAggregateNode : public ExecNode { } void StopProducing() override { + EVENT(span_, "StopProducing"); if (input_counter_.Cancel()) { finished_.MarkFinished(); } @@ -235,10 +260,18 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { + util::tracing::Span span; + START_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext ctx{plan()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); @@ -250,7 +283,6 @@ class ScalarAggregateNode : public ExecNode { return Status::OK(); } - Future<> finished_ = Future<>::MakeFinished(); const std::vector target_field_ids_; const std::vector aggs_; const std::vector kernels_; @@ -358,6 +390,11 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } Status Consume(ExecBatch batch) { + util::tracing::Span span; + START_SPAN(span, "Consume", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); size_t thread_index = get_thread_index_(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -379,6 +416,12 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); @@ -394,6 +437,8 @@ class GroupByNode : public ExecNode { } Status Merge() { + util::tracing::Span span; + START_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state0 = &local_states_[0]; for (size_t i = 1; i < local_states_.size(); ++i) { ThreadLocalState* state = &local_states_[i]; @@ -406,6 +451,12 @@ class GroupByNode : public ExecNode { state->grouper.reset(); for (size_t i = 0; i < agg_kernels_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Merge"}}); KernelContext batch_ctx{ctx_}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); @@ -420,6 +471,10 @@ class GroupByNode : public ExecNode { } Result Finalize() { + util::tracing::Span span; + START_SPAN(span, "Finalize", + {{"group_by", ToStringExtra()}, {"node.label", label()}}); + ThreadLocalState* state = &local_states_[0]; // If we never got any batches, then state won't have been initialized RETURN_NOT_OK(InitLocalStateIfNeeded(state)); @@ -429,6 +484,12 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext batch_ctx{ctx_}; batch_ctx.SetState(state->agg_states[i].get()); RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); @@ -484,6 +545,13 @@ class GroupByNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); + // bail if StopProducing was called if (finished_.is_finished()) return; @@ -497,12 +565,16 @@ class GroupByNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); + DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); } void InputFinished(ExecNode* input, int total_batches) override { + EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); + // bail if StopProducing was called if (finished_.is_finished()) return; @@ -514,17 +586,23 @@ class GroupByNode : public ExecNode { } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); local_states_.resize(ThreadIndexer::Capacity()); return Status::OK(); } - void PauseProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); } - void ResumeProducing(ExecNode* output) override {} + void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); } void StopProducing(ExecNode* output) override { + EVENT(span_, "StopProducing"); DCHECK_EQ(output, outputs_[0]); ARROW_UNUSED(input_counter_.Cancel()); @@ -603,7 +681,6 @@ class GroupByNode : public ExecNode { } ExecContext* ctx_; - Future<> finished_ = Future<>::MakeFinished(); const std::vector key_field_ids_; const std::vector agg_src_field_ids_; diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 7cd3011b8ab..17d71a2cf04 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -33,6 +33,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -43,7 +44,9 @@ namespace compute { namespace { struct ExecPlanImpl : public ExecPlan { - explicit ExecPlanImpl(ExecContext* exec_context) : ExecPlan(exec_context) {} + explicit ExecPlanImpl(ExecContext* exec_context, + std::shared_ptr metadata = NULLPTR) + : ExecPlan(exec_context), metadata_(std::move(metadata)) {} ~ExecPlanImpl() override { if (started_ && !finished_.is_finished()) { @@ -78,6 +81,16 @@ struct ExecPlanImpl : public ExecPlan { } Status StartProducing() { + START_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); +#ifdef ARROW_WITH_OPENTELEMETRY + if (HasMetadata()) { + auto pairs = metadata().get()->sorted_pairs(); + std::for_each(std::begin(pairs), std::end(pairs), + [this](std::pair const& pair) { + span_.Get().span->SetAttribute(pair.first, pair.second); + }); + } +#endif if (started_) { return Status::Invalid("restarted ExecPlan"); } @@ -94,7 +107,10 @@ struct ExecPlanImpl : public ExecPlan { for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) { auto node = *it; + EVENT(span_, "StartProducing:" + node->label(), + {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}}); st = node->StartProducing(); + EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}}); if (!st.ok()) { // Stop nodes that successfully started, in reverse order stopped_ = true; @@ -106,11 +122,13 @@ struct ExecPlanImpl : public ExecPlan { } finished_ = AllFinished(futures); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return st; } void StopProducing() { DCHECK(started_) << "stopped an ExecPlan which never started"; + EVENT(span_, "StopProducing"); stopped_ = true; StopProducingImpl(sorted_nodes_.begin(), sorted_nodes_.end()); @@ -120,6 +138,8 @@ struct ExecPlanImpl : public ExecPlan { void StopProducingImpl(It begin, It end) { for (auto it = begin; it != end; ++it) { auto node = *it; + EVENT(span_, "StopProducing:" + node->label(), + {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}}); node->StopProducing(); } } @@ -172,6 +192,8 @@ struct ExecPlanImpl : public ExecPlan { NodeVector sources_, sinks_; NodeVector sorted_nodes_; uint32_t auto_label_counter_ = 0; + util::tracing::Span span_; + std::shared_ptr metadata_; }; ExecPlanImpl* ToDerived(ExecPlan* ptr) { return checked_cast(ptr); } @@ -190,8 +212,9 @@ util::optional GetNodeIndex(const std::vector& nodes, } // namespace -Result> ExecPlan::Make(ExecContext* ctx) { - return std::shared_ptr(new ExecPlanImpl{ctx}); +Result> ExecPlan::Make( + ExecContext* ctx, std::shared_ptr metadata) { + return std::shared_ptr(new ExecPlanImpl{ctx, metadata}); } ExecNode* ExecPlan::AddNode(std::unique_ptr node) { @@ -212,6 +235,12 @@ void ExecPlan::StopProducing() { ToDerived(this)->StopProducing(); } Future<> ExecPlan::finished() { return ToDerived(this)->finished_; } +bool ExecPlan::HasMetadata() const { return !!(ToDerived(this)->metadata_); } + +std::shared_ptr ExecPlan::metadata() const { + return ToDerived(this)->metadata_; +} + std::string ExecPlan::ToString() const { return ToDerived(this)->ToString(); } ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs, @@ -316,22 +345,31 @@ MapNode::MapNode(ExecPlan* plan, std::vector inputs, void MapNode::ErrorReceived(ExecNode* input, Status error) { DCHECK_EQ(input, inputs_[0]); + EVENT(span_, "ErrorReceived", {{"error.message", error.message()}}); outputs_[0]->ErrorReceived(this, std::move(error)); } void MapNode::InputFinished(ExecNode* input, int total_batches) { DCHECK_EQ(input, inputs_[0]); + EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); outputs_[0]->InputFinished(this, total_batches); if (input_counter_.SetTotal(total_batches)) { this->Finish(); } } -Status MapNode::StartProducing() { return Status::OK(); } +Status MapNode::StartProducing() { + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); + finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); + return Status::OK(); +} -void MapNode::PauseProducing(ExecNode* output) {} +void MapNode::PauseProducing(ExecNode* output) { EVENT(span_, "PauseProducing"); } -void MapNode::ResumeProducing(ExecNode* output) {} +void MapNode::ResumeProducing(ExecNode* output) { EVENT(span_, "ResumeProducing"); } void MapNode::StopProducing(ExecNode* output) { DCHECK_EQ(output, outputs_[0]); @@ -339,6 +377,7 @@ void MapNode::StopProducing(ExecNode* output) { } void MapNode::StopProducing() { + EVENT(span_, "StopProducing"); if (executor_) { this->stop_source_.RequestStop(); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 4cb7fad009f..ac5996ed72e 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -28,8 +28,10 @@ #include "arrow/type_fwd.h" #include "arrow/util/async_util.h" #include "arrow/util/cancel.h" +#include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" +#include "arrow/util/tracing.h" #include "arrow/util/visibility.h" namespace arrow { @@ -45,7 +47,9 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { ExecContext* exec_context() const { return exec_context_; } /// Make an empty exec plan - static Result> Make(ExecContext* = default_exec_context()); + static Result> Make( + ExecContext* = default_exec_context(), + std::shared_ptr metadata = NULLPTR); ExecNode* AddNode(std::unique_ptr node); @@ -80,6 +84,12 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { /// \brief A future which will be marked finished when all nodes have stopped producing. Future<> finished(); + /// \brief Return whether the plan has non-empty metadata + bool HasMetadata() const; + + /// \brief Return the plan's attached metadata + std::shared_ptr metadata() const; + std::string ToString() const; protected: @@ -245,6 +255,11 @@ class ARROW_EXPORT ExecNode { std::shared_ptr output_schema_; int num_outputs_; NodeVector outputs_; + + // Future to sync finished + Future<> finished_ = Future<>::MakeFinished(); + + util::tracing::Span span_; }; /// \brief MapNode is an ExecNode type class which process a task like filter/project @@ -285,9 +300,6 @@ class MapNode : public ExecNode { // Counter for the number of batches received AtomicCounter input_counter_; - // Future to sync finished - Future<> finished_ = Future<>::Make(); - // The task group for the corresponding batches util::AsyncTaskGroup task_group_; diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 2e6d974dc13..c6de5280a4c 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -26,6 +26,8 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" + namespace arrow { using internal::checked_cast; @@ -68,6 +70,12 @@ class FilterNode : public MapNode { ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(filter_, target.guarantee)); + util::tracing::Span span; + START_SPAN(span, "Filter", + {{"filter.expression", ToStringExtra()}, + {"filter.expression.simplified", simplified_filter.ToString()}, + {"filter.length", target.length}}); + ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, plan()->exec_context())); @@ -76,7 +84,6 @@ class FilterNode : public MapNode { if (mask_scalar.is_valid && mask_scalar.value) { return target; } - return target.Slice(0, 0); } @@ -93,8 +100,19 @@ class FilterNode : public MapNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); - auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); }; + auto func = [this](ExecBatch batch) { + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"filter", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); + auto result = DoFilter(std::move(batch)); + MARK_SPAN(span, result.status()); + END_SPAN(span); + return result; + }; this->SubmitTask(std::move(func), std::move(batch)); } diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 02f97dd6f86..a3adbb2d753 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -42,6 +42,8 @@ class HashJoinBasicImpl : public HashJoinImpl { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } + EVENT(span_, "InputReceived"); + if (QueueBatchIfNeeded(side, batch)) { return Status::OK(); } else { @@ -54,6 +56,7 @@ class HashJoinBasicImpl : public HashJoinImpl { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } + EVENT(span_, "InputFinished", {{"side", side}}); if (side == 0) { bool proceed; { @@ -86,6 +89,11 @@ class HashJoinBasicImpl : public HashJoinImpl { TaskScheduler::ScheduleImpl schedule_task_callback) override { num_threads = std::max(num_threads, static_cast(1)); + START_SPAN(span_, "HashJoinBasicImpl", + {{"detail", filter.ToString()}, + {"join.kind", ToString(join_type)}, + {"join.threads", static_cast(num_threads)}}); + ctx_ = ctx; join_type_ = join_type; num_threads_ = num_threads; @@ -121,6 +129,8 @@ class HashJoinBasicImpl : public HashJoinImpl { } void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) override { + EVENT(span_, "Abort"); + END_SPAN(span_); cancelled_ = true; scheduler_->Abort(std::move(pos_abort_callback)); } @@ -774,6 +784,7 @@ class HashJoinBasicImpl : public HashJoinImpl { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } + END_SPAN(span_); finished_callback_(num_batches_produced_.load()); return Status::OK(); } diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index d52d7d980f9..83ad4cb3f90 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -27,6 +27,7 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type.h" +#include "arrow/util/tracing_internal.h" namespace arrow { namespace compute { @@ -112,6 +113,9 @@ class HashJoinImpl { virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0; static Result> MakeBasic(); + + protected: + util::tracing::Span span_; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 51e2e97cb1a..9295b5aaf4d 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -486,13 +486,18 @@ class HashJoinNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); - if (complete_.load()) { return; } size_t thread_index = thread_indexer_(); int side = (input == inputs_[0]) ? 0 : 1; + + EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"batch.length", batch.length}}); + { Status status = impl_->InputReceived(thread_index, side, std::move(batch)); if (!status.ok()) { @@ -512,6 +517,7 @@ class HashJoinNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); StopProducing(); outputs_[0]->ErrorReceived(this, std::move(error)); @@ -523,6 +529,8 @@ class HashJoinNode : public ExecNode { size_t thread_index = thread_indexer_(); int side = (input == inputs_[0]) ? 0 : 1; + EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", total_batches}}); + if (batch_count_[side].SetTotal(total_batches)) { Status status = impl_->InputFinished(thread_index, side); if (!status.ok()) { @@ -534,7 +542,12 @@ class HashJoinNode : public ExecNode { } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); bool use_sync_execution = !(plan_->exec_context()->executor()); size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity(); @@ -550,9 +563,9 @@ class HashJoinNode : public ExecNode { return Status::OK(); } - void PauseProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); } - void ResumeProducing(ExecNode* output) override {} + void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); } void StopProducing(ExecNode* output) override { DCHECK_EQ(output, outputs_[0]); @@ -560,6 +573,7 @@ class HashJoinNode : public ExecNode { } void StopProducing() override { + EVENT(span_, "StopProducing"); bool expected = false; if (complete_.compare_exchange_strong(expected, true)) { for (auto&& input : inputs_) { @@ -606,7 +620,6 @@ class HashJoinNode : public ExecNode { private: AtomicCounter batch_count_[2]; std::atomic complete_; - Future<> finished_ = Future<>::MakeFinished(); JoinType join_type_; std::vector key_cmp_; Expression filter_; @@ -616,7 +629,6 @@ class HashJoinNode : public ExecNode { }; namespace internal { - void RegisterHashJoinNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make)); } diff --git a/cpp/src/arrow/compute/exec/options.cc b/cpp/src/arrow/compute/exec/options.cc new file mode 100644 index 00000000000..585cc7a6a64 --- /dev/null +++ b/cpp/src/arrow/compute/exec/options.cc @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/options.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace compute { + +std::string ToString(JoinType t) { + switch (t) { + case JoinType::LEFT_SEMI: + return "LEFT_SEMI"; + case JoinType::RIGHT_SEMI: + return "RIGHT_SEMI"; + case JoinType::LEFT_ANTI: + return "LEFT_ANTI"; + case JoinType::RIGHT_ANTI: + return "RIGHT_ANTI"; + case JoinType::INNER: + return "INNER"; + case JoinType::LEFT_OUTER: + return "LEFT_OUTER"; + case JoinType::RIGHT_OUTER: + return "RIGHT_OUTER"; + case JoinType::FULL_OUTER: + return "FULL_OUTER"; + } + ARROW_LOG(FATAL) << "Invalid variant of arrow::compute::JoinType"; + std::abort(); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 2723c4454c0..de51236dc85 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -164,6 +164,8 @@ enum class JoinType { FULL_OUTER }; +std::string ToString(JoinType t); + enum class JoinKeyCmp { EQ, IS }; /// \brief Make a node which implements join operation using hash join strategy. @@ -239,12 +241,12 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { std::vector left_output; // output fields passed from right input std::vector right_output; - // key comparison function (determines whether a null key is equal another null key or - // not) + // key comparison function (determines whether a null key is equal another null + // key or not) std::vector key_cmp; - // prefix added to names of output fields coming from left input (used to distinguish, - // if necessary, between fields of the same name in left and right input and can be left - // empty if there are no name collisions) + // prefix added to names of output fields coming from left input (used to + // distinguish, if necessary, between fields of the same name in left and right + // input and can be left empty if there are no name collisions) std::string output_prefix_for_left; // prefix added to names of output fields coming from right input std::string output_prefix_for_right; diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index c675acb3d98..69d9011d746 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -29,6 +29,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -79,6 +80,11 @@ class ProjectNode : public MapNode { Result DoProject(const ExecBatch& target) { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { + util::tracing::Span span; + START_SPAN(span, "Project", + {{"project.descr", exprs_[i].descr().ToString()}, + {"project.length", target.length}, + {"project.expression", exprs_[i].ToString()}}); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], target.guarantee)); @@ -89,8 +95,19 @@ class ProjectNode : public MapNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); - auto func = [this](ExecBatch batch) { return DoProject(std::move(batch)); }; + auto func = [this](ExecBatch batch) { + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"project", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); + auto result = DoProject(std::move(batch)); + MARK_SPAN(span, result.status()); + END_SPAN(span); + return result; + }; this->SubmitTask(std::move(func), std::move(batch)); } diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 1bb2680383c..b95bbcf70d2 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -37,6 +37,7 @@ #include "arrow/util/logging.h" #include "arrow/util/optional.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" namespace arrow { @@ -76,7 +77,13 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); + return Status::OK(); } @@ -89,6 +96,8 @@ class SinkNode : public ExecNode { [[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); } void StopProducing() override { + EVENT(span_, "StopProducing"); + Finish(); inputs_[0]->StopProducing(this); } @@ -96,6 +105,11 @@ class SinkNode : public ExecNode { Future<> finished() override { return finished_; } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); bool did_push = producer_.Push(std::move(batch)); @@ -107,6 +121,7 @@ class SinkNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); producer_.Push(std::move(error)); @@ -118,6 +133,7 @@ class SinkNode : public ExecNode { } void InputFinished(ExecNode* input, int total_batches) override { + EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); if (input_counter_.SetTotal(total_batches)) { Finish(); } @@ -131,7 +147,6 @@ class SinkNode : public ExecNode { } AtomicCounter input_counter_; - Future<> finished_ = Future<>::MakeFinished(); PushGenerator>::Producer producer_; }; @@ -160,7 +175,12 @@ class ConsumingSinkNode : public ExecNode { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); } @@ -173,6 +193,7 @@ class ConsumingSinkNode : public ExecNode { [[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); } void StopProducing() override { + EVENT(span_, "StopProducing"); Finish(Status::Invalid("ExecPlan was stopped early")); inputs_[0]->StopProducing(this); } @@ -180,6 +201,11 @@ class ConsumingSinkNode : public ExecNode { Future<> finished() override { return finished_; } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); // This can happen if an error was received and the source hasn't yet stopped. Since @@ -203,6 +229,7 @@ class ConsumingSinkNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); if (input_counter_.Cancel()) { @@ -213,6 +240,7 @@ class ConsumingSinkNode : public ExecNode { } void InputFinished(ExecNode* input, int total_batches) override { + EVENT(span_, "InputFinished", {{"batches.length", total_batches}}); if (input_counter_.SetTotal(total_batches)) { Finish(Status::OK()); } @@ -229,7 +257,6 @@ class ConsumingSinkNode : public ExecNode { AtomicCounter input_counter_; - Future<> finished_ = Future<>::MakeFinished(); std::shared_ptr consumer_; }; @@ -275,6 +302,11 @@ struct OrderBySinkNode final : public SinkNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(), @@ -309,6 +341,8 @@ struct OrderBySinkNode final : public SinkNode { } void Finish() override { + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 46bba5609d4..6d47609d2a2 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -32,6 +32,7 @@ #include "arrow/util/logging.h" #include "arrow/util/optional.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/unreachable.h" namespace arrow { @@ -66,6 +67,11 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}}); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this @@ -140,7 +146,7 @@ struct SourceNode : ExecNode { outputs_[0]->InputFinished(this, total_batches); return task_group_.End(); }); - + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); } @@ -164,7 +170,6 @@ struct SourceNode : ExecNode { std::mutex mutex_; bool stop_requested_{false}; int batch_count_{0}; - Future<> finished_ = Future<>::MakeFinished(); util::AsyncTaskGroup task_group_; AsyncGenerator> generator_; }; diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index fef2f4e1866..9232516cc6d 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -27,6 +27,7 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -75,6 +76,7 @@ class UnionNode : public ExecNode { } void InputReceived(ExecNode* input, ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); if (finished_.is_finished()) { @@ -87,6 +89,7 @@ class UnionNode : public ExecNode { } void ErrorReceived(ExecNode* input, Status error) override { + EVENT(span_, "ErrorReceived", {{"error", error.message()}}); DCHECK_EQ(input, inputs_[0]); outputs_[0]->ErrorReceived(this, std::move(error)); @@ -94,6 +97,8 @@ class UnionNode : public ExecNode { } void InputFinished(ExecNode* input, int total_batches) override { + EVENT(span_, "InputFinished", + {{"input", input_count_.count()}, {"batches.length", total_batches}}); ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); total_batches_.fetch_add(total_batches); @@ -107,15 +112,21 @@ class UnionNode : public ExecNode { } Status StartProducing() override { + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); } - void PauseProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); } - void ResumeProducing(ExecNode* output) override {} + void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); } void StopProducing(ExecNode* output) override { + EVENT(span_, "StopProducing"); DCHECK_EQ(output, outputs_[0]); if (batch_count_.Cancel()) { finished_.MarkFinished(); @@ -140,7 +151,6 @@ class UnionNode : public ExecNode { AtomicCounter batch_count_; AtomicCounter input_count_; std::atomic total_batches_{0}; - Future<> finished_ = Future<>::MakeFinished(); }; namespace internal { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 0aecde11ba4..1a7f36862dd 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -31,6 +31,7 @@ #include "arrow/datum.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -213,6 +214,12 @@ Result Function::Execute(const std::vector& args, return Execute(args, options, &default_ctx); } + util::tracing::Span span; + START_SPAN(span, name(), + {{"function.name", name()}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", kind()}}); + // type-check Datum arguments here. Really we'd like to avoid this as much as // possible RETURN_NOT_OK(detail::CheckAllValues(args)); @@ -254,7 +261,6 @@ Result Function::Execute(const std::vector& args, } namespace { - Status ValidateFunctionSummary(const std::string& s) { if (s.find('\n') != s.npos) { return Status::Invalid("summary contains a newline"); diff --git a/cpp/src/arrow/compute/function_internal.h b/cpp/src/arrow/compute/function_internal.h index 926b95a9d56..f2303b87d90 100644 --- a/cpp/src/arrow/compute/function_internal.h +++ b/cpp/src/arrow/compute/function_internal.h @@ -125,7 +125,11 @@ static inline std::string GenericToString(const std::shared_ptr& value) { static inline std::string GenericToString(const std::shared_ptr& value) { std::stringstream ss; - ss << value->type->ToString() << ":" << value->ToString(); + if (value) { + ss << value->type->ToString() << ":" << value->ToString(); + } else { + ss << ""; + } return ss.str(); } diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 695ee9ff357..96bbe99abf4 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -626,14 +626,14 @@ class ARROW_MUST_USE_TYPE Future { /// returning the future. /// /// Two callbacks are supported: - /// - OnSuccess, called with the result (const ValueType&) on successul completion. + /// - OnSuccess, called with the result (const ValueType&) on successful completion. /// for an empty future this will be called with nothing () /// - OnFailure, called with the error (const Status&) on failed completion. /// This callback is optional and defaults to a passthru of any errors. /// /// Then() returns a Future whose ValueType is derived from the return type of the /// callbacks. If a callback returns: - /// - void, a Future<> will be returned which will completes successully as soon + /// - void, a Future<> will be returned which will completes successfully as soon /// as the callback runs. /// - Status, a Future<> will be returned which will complete with the returned Status /// as soon as the callback runs. @@ -645,7 +645,7 @@ class ARROW_MUST_USE_TYPE Future { /// /// The continued Future type must be the same for both callbacks. /// - /// Note that OnFailure can swallow errors, allowing continued Futures to successully + /// Note that OnFailure can swallow errors, allowing continued Futures to successfully /// complete even if this Future fails. /// /// If this future is already completed then the callback will be run immediately diff --git a/cpp/src/arrow/util/tracing.cc b/cpp/src/arrow/util/tracing.cc new file mode 100644 index 00000000000..b8bddcd5052 --- /dev/null +++ b/cpp/src/arrow/util/tracing.cc @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/tracing.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { +namespace util { +namespace tracing { + +#ifdef ARROW_WITH_OPENTELEMETRY + +Span::Impl& Span::Set(const Impl& impl) { + inner_impl.reset(new Impl(impl)); + return *inner_impl; +} + +Span::Impl& Span::Set(Impl&& impl) { + inner_impl.reset(new Impl(std::move(impl))); + return *inner_impl; +} + +#endif + +// Default destructor when impl type is complete. +Span::~Span() = default; + +} // namespace tracing +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/tracing.h b/cpp/src/arrow/util/tracing.h new file mode 100644 index 00000000000..15f7fca1eee --- /dev/null +++ b/cpp/src/arrow/util/tracing.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/util/logging.h" + +namespace arrow { + +namespace internal { +namespace tracing { + +// Forward declaration SpanImpl. +class SpanImpl; + +} // namespace tracing +} // namespace internal + +namespace util { +namespace tracing { + +class ARROW_EXPORT Span { + public: + using Impl = arrow::internal::tracing::SpanImpl; + + Span() = default; // Default constructor. The inner_impl is a nullptr. + ~Span(); // Destructor. Default destructor defined in tracing.cc where impl is a + // complete type. + + Impl& Set(const Impl&); + Impl& Set(Impl&&); + + const Impl& Get() const { + ARROW_CHECK(inner_impl) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return *inner_impl; + } + + Impl& Get() { + ARROW_CHECK(inner_impl) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return *inner_impl; + } + + private: + std::unique_ptr inner_impl; +}; + +} // namespace tracing +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index d39f95061c7..5dbb0f345e4 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/util/tracing_internal.h" +#include "arrow/util/tracing.h" #include #include @@ -106,7 +107,7 @@ class ThreadIdSpanProcessor : public sdktrace::BatchSpanProcessor { void OnEnd(std::unique_ptr&& span) noexcept override { std::stringstream thread_id; thread_id << std::this_thread::get_id(); - span->SetAttribute("thread_id", thread_id.str()); + span->SetAttribute("thread.id", thread_id.str()); sdktrace::BatchSpanProcessor::OnEnd(std::move(span)); } }; @@ -152,9 +153,10 @@ class FlushLog { explicit FlushLog(nostd::shared_ptr provider) : provider_(std::move(provider)) {} ~FlushLog() { - if (provider_) { - provider_->ForceFlush(std::chrono::microseconds(1000000)); - } + // TODO: ForceFlush apparently sends data that OTLP connector can't handle + // if (provider_) { + // provider_->ForceFlush(std::chrono::microseconds(1000000)); + // } } nostd::shared_ptr provider_; }; @@ -182,6 +184,15 @@ opentelemetry::trace::Tracer* GetTracer() { return tracer.get(); } +#ifdef ARROW_WITH_OPENTELEMETRY +opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( + const util::tracing::Span& parent_span) { + opentelemetry::trace::StartSpanOptions options; + options.parent = parent_span.Get().span->GetContext(); + return options; +} +#endif + } // namespace tracing } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index f3ca5e5ce3d..d4947ac88fe 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -28,6 +28,7 @@ #pragma warning(disable : 4522) #endif #include +#include #ifdef _MSC_VER #pragma warning(pop) #endif @@ -35,6 +36,8 @@ #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing.h" #include "arrow/util/visibility.h" namespace arrow { @@ -97,6 +100,63 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, return fut; }; } + +class SpanImpl { + public: + opentelemetry::nostd::shared_ptr span; +}; + +opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( + const util::tracing::Span& parent_span); + +#define START_SPAN(target_span, ...) \ + auto opentelemetry_scope##__LINE__ = \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + target_span \ + .Set(::arrow::util::tracing::Span::Impl{ \ + ::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__)}) \ + .span) + +#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ + auto opentelemetry_scope##__LINE__ = \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + target_span \ + .Set(::arrow::util::tracing::Span::Impl{ \ + ::arrow::internal::tracing::GetTracer()->StartSpan( \ + __VA_ARGS__, \ + ::arrow::internal::tracing::SpanOptionsWithParent(parent_span))}) \ + .span) + +#define EVENT(target_span, ...) target_span.Get().span->AddEvent(__VA_ARGS__) + +#define MARK_SPAN(target_span, status) \ + ::arrow::internal::tracing::MarkSpan(status, target_span.Get().span.get()) + +#define END_SPAN(target_span) target_span.Get().span->End() + +#define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) \ + target_future = target_future.Then( \ + [target_capture]() { \ + MARK_SPAN(target_span, Status::OK()); \ + END_SPAN(target_span); \ + }, \ + [target_capture](const Status& st) { \ + MARK_SPAN(target_span, st); \ + END_SPAN(target_span); \ + return st; \ + }) + +#else + +class SpanImpl {}; + +#define START_SPAN(target_span, ...) +#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) +#define MARK_SPAN(target_span, status) +#define EVENT(target_span, ...) +#define END_SPAN(target_span) +#define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) + #endif } // namespace tracing