From f10943089d40dacd7f6a2c64803b0e9f93a41842 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 7 Jun 2022 15:39:53 -0700 Subject: [PATCH 1/7] fix: Don't use pimpl for Span --- cpp/src/arrow/CMakeLists.txt | 1 - cpp/src/arrow/compute/exec/exec_plan.cc | 2 +- cpp/src/arrow/util/tracing.cc | 45 ------------- cpp/src/arrow/util/tracing.h | 37 +--------- cpp/src/arrow/util/tracing_internal.cc | 2 +- cpp/src/arrow/util/tracing_internal.h | 89 ++++++++++++++++--------- 6 files changed, 62 insertions(+), 114 deletions(-) delete mode 100644 cpp/src/arrow/util/tracing.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index fd2f10db2ff..6b6976234e9 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -222,7 +222,6 @@ set(ARROW_SRCS util/tdigest.cc util/thread_pool.cc util/time.cc - util/tracing.cc util/trie.cc util/unreachable.cc util/uri.cc diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index bb197f8db84..516b1b272e2 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -87,7 +87,7 @@ struct ExecPlanImpl : public ExecPlan { auto pairs = metadata().get()->sorted_pairs(); std::for_each(std::begin(pairs), std::end(pairs), [this](std::pair const& pair) { - span_.Get().span->SetAttribute(pair.first, pair.second); + CAST_SPAN(span_).Get()->SetAttribute(pair.first, pair.second); }); } #endif diff --git a/cpp/src/arrow/util/tracing.cc b/cpp/src/arrow/util/tracing.cc deleted file mode 100644 index b8bddcd5052..00000000000 --- a/cpp/src/arrow/util/tracing.cc +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/util/tracing.h" -#include "arrow/util/make_unique.h" -#include "arrow/util/tracing_internal.h" - -namespace arrow { -namespace util { -namespace tracing { - -#ifdef ARROW_WITH_OPENTELEMETRY - -Span::Impl& Span::Set(const Impl& impl) { - inner_impl.reset(new Impl(impl)); - return *inner_impl; -} - -Span::Impl& Span::Set(Impl&& impl) { - inner_impl.reset(new Impl(std::move(impl))); - return *inner_impl; -} - -#endif - -// Default destructor when impl type is complete. -Span::~Span() = default; - -} // namespace tracing -} // namespace util -} // namespace arrow diff --git a/cpp/src/arrow/util/tracing.h b/cpp/src/arrow/util/tracing.h index 15f7fca1eee..7a3cff0ee7d 100644 --- a/cpp/src/arrow/util/tracing.h +++ b/cpp/src/arrow/util/tracing.h @@ -22,46 +22,13 @@ #include "arrow/util/logging.h" namespace arrow { - -namespace internal { -namespace tracing { - -// Forward declaration SpanImpl. -class SpanImpl; - -} // namespace tracing -} // namespace internal - namespace util { namespace tracing { class ARROW_EXPORT Span { public: - using Impl = arrow::internal::tracing::SpanImpl; - - Span() = default; // Default constructor. The inner_impl is a nullptr. - ~Span(); // Destructor. Default destructor defined in tracing.cc where impl is a - // complete type. - - Impl& Set(const Impl&); - Impl& Set(Impl&&); - - const Impl& Get() const { - ARROW_CHECK(inner_impl) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return *inner_impl; - } - - Impl& Get() { - ARROW_CHECK(inner_impl) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return *inner_impl; - } - - private: - std::unique_ptr inner_impl; + // Manually specify destructor otherwise MSVC will complain about unused var + ~Span() {} }; } // namespace tracing diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 904a1fd76a8..e61c54755e7 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -206,7 +206,7 @@ opentelemetry::trace::Tracer* GetTracer() { opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span) { opentelemetry::trace::StartSpanOptions options; - options.parent = parent_span.Get().span->GetContext(); + options.parent = ((const ::arrow::internal::tracing::OTSpan&)parent_span).Get()->GetContext(); return options; } #endif diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index d0d6062e6e2..22b2abbc8d5 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -106,48 +106,77 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } -class SpanImpl { +class OTSpan : ::arrow::util::tracing::Span { + using InnerSpan = opentelemetry::nostd::shared_ptr; + public: - opentelemetry::nostd::shared_ptr span; + InnerSpan& Set(const InnerSpan& span) { + inner_span = span; + return inner_span; + } + + InnerSpan& Set(InnerSpan&& span) { + inner_span = std::move(span); + return inner_span; + } + + const InnerSpan& Get() const { + ARROW_CHECK(inner_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return inner_span; + } + + InnerSpan& Get() { + ARROW_CHECK(inner_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return inner_span; + } + + InnerSpan inner_span; }; opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span); -#define START_SPAN(target_span, ...) \ - auto opentelemetry_scope##__LINE__ = \ - ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ - target_span \ - .Set(::arrow::util::tracing::Span::Impl{ \ - ::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__)}) \ - .span) - -#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ - auto opentelemetry_scope##__LINE__ = \ - ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ - target_span \ - .Set(::arrow::util::tracing::Span::Impl{ \ - ::arrow::internal::tracing::GetTracer()->StartSpan( \ - __VA_ARGS__, \ - ::arrow::internal::tracing::SpanOptionsWithParent(parent_span))}) \ - .span) - -#define START_COMPUTE_SPAN(target_span, ...) \ - START_SPAN(target_span, __VA_ARGS__); \ - target_span.Get().span->SetAttribute( \ - "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) +#define CAST_SPAN(target_span) \ + (*(reinterpret_cast<::arrow::internal::tracing::OTSpan*>(&(target_span)))) + +#define START_SPAN(target_span, ...) \ + auto opentelemetry_scope##__LINE__ = \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + CAST_SPAN(target_span) \ + .Set(::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__))) + +#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ + auto opentelemetry_scope##__LINE__ = \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + CAST_SPAN(target_span) \ + .Set(::arrow::internal::tracing::GetTracer()->StartSpan( \ + __VA_ARGS__, \ + ::arrow::internal::tracing::SpanOptionsWithParent(parent_span)))) + +#define START_COMPUTE_SPAN(target_span, ...) \ + START_SPAN(target_span, __VA_ARGS__); \ + CAST_SPAN(target_span) \ + .Get() \ + ->SetAttribute("arrow.memory_pool_bytes", \ + ::arrow::default_memory_pool()->bytes_allocated()) #define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \ START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \ - target_span.Get().span->SetAttribute( \ - "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) + CAST_SPAN(target_span) \ + .Get() \ + ->SetAttribute("arrow.memory_pool_bytes", \ + ::arrow::default_memory_pool()->bytes_allocated()) -#define EVENT(target_span, ...) target_span.Get().span->AddEvent(__VA_ARGS__) +#define EVENT(target_span, ...) CAST_SPAN(target_span).Get()->AddEvent(__VA_ARGS__) #define MARK_SPAN(target_span, status) \ - ::arrow::internal::tracing::MarkSpan(status, target_span.Get().span.get()) + ::arrow::internal::tracing::MarkSpan(status, CAST_SPAN(target_span).Get().get()) -#define END_SPAN(target_span) target_span.Get().span->End() +#define END_SPAN(target_span) CAST_SPAN(target_span).Get()->End() #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) \ target_future = target_future.Then( \ @@ -183,8 +212,6 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #else // !ARROW_WITH_OPENTELEMETRY -class SpanImpl {}; - #define START_SPAN(target_span, ...) #define START_SPAN_WITH_PARENT(target_span, parent_span, ...) #define START_COMPUTE_SPAN(target_span, ...) From b89f9f9514a1722b91dee0ed1889954c859f01ea Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 7 Jun 2022 15:40:17 -0700 Subject: [PATCH 2/7] fix: use correct name for Windows thrift debug libs --- cpp/cmake_modules/FindThrift.cmake | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/cmake_modules/FindThrift.cmake b/cpp/cmake_modules/FindThrift.cmake index 5d195844e20..07028971d9f 100644 --- a/cpp/cmake_modules/FindThrift.cmake +++ b/cpp/cmake_modules/FindThrift.cmake @@ -47,9 +47,17 @@ endfunction(EXTRACT_THRIFT_VERSION) if(MSVC_TOOLCHAIN AND NOT DEFINED THRIFT_MSVC_LIB_SUFFIX) if(NOT ARROW_THRIFT_USE_SHARED) if(ARROW_USE_STATIC_CRT) - set(THRIFT_MSVC_LIB_SUFFIX "mt") + if("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") + set(THRIFT_MSVC_LIB_SUFFIX "mtd") + else() + set(THRIFT_MSVC_LIB_SUFFIX "mt") + endif() else() - set(THRIFT_MSVC_LIB_SUFFIX "md") + if("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") + set(THRIFT_MSVC_LIB_SUFFIX "mdd") + else() + set(THRIFT_MSVC_LIB_SUFFIX "md") + endif() endif() endif() endif() From 403954ac7c6d5344e8f89d76c4a10fa0e1d9924d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Jun 2022 12:40:13 -0700 Subject: [PATCH 3/7] fix: Add missing import in test --- cpp/src/arrow/compute/light_array_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 3f6d4780352..dcc7841a091 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -18,6 +18,7 @@ #include "arrow/compute/light_array.h" #include +#include #include "arrow/compute/exec/test_util.h" #include "arrow/testing/generator.h" From b92e2f4c86c62f036bdf3afa8d24d156d2bb54fe Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Jun 2022 15:01:17 -0700 Subject: [PATCH 4/7] Attempt to get casting working, but failing --- cpp/src/arrow/compute/exec/hash_join.cc | 1 + cpp/src/arrow/compute/exec/hash_join.h | 2 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 1 + cpp/src/arrow/compute/function.cc | 10 ++- cpp/src/arrow/util/tracing.h | 2 +- cpp/src/arrow/util/tracing_internal.cc | 14 ++++ cpp/src/arrow/util/tracing_internal.h | 70 +++++++++++--------- 7 files changed, 65 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 63d9522c443..391c8b2cd4b 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -29,6 +29,7 @@ #include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/task_util.h" #include "arrow/compute/kernels/row_encoder.h" +#include "arrow/util/tracing_internal.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index 9739cbc6436..63b6e51f1d3 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -28,7 +28,7 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type.h" -#include "arrow/util/tracing_internal.h" +#include "arrow/util/tracing.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index e47d6095542..c8e6c6ffb2f 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -28,6 +28,7 @@ #include "arrow/util/future.h" #include "arrow/util/make_unique.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 2b3d4e6feb9..80411d01dd1 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -214,7 +214,15 @@ Result Function::Execute(const std::vector& args, return Execute(args, options, &default_ctx); } - util::tracing::Span span; + +// #ifdef ARROW_WITH_OPENTELEMETRY +// auto tracer = arrow::internal::tracing::GetTracer(); +// auto span = tracer->StartSpan("arrow::compute::Function::Execute"); +// #else +// util::tracing::Span span; +// #endif + util::tracing::Span span = arrow::internal::tracing::NewSpan("arrow::compute::Function::Execute"); + START_COMPUTE_SPAN(span, name(), {{"function.name", name()}, {"function.options", options ? options->ToString() : ""}, diff --git a/cpp/src/arrow/util/tracing.h b/cpp/src/arrow/util/tracing.h index 7a3cff0ee7d..e8f24cebd9f 100644 --- a/cpp/src/arrow/util/tracing.h +++ b/cpp/src/arrow/util/tracing.h @@ -28,7 +28,7 @@ namespace tracing { class ARROW_EXPORT Span { public: // Manually specify destructor otherwise MSVC will complain about unused var - ~Span() {} + virtual ~Span() {} }; } // namespace tracing diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index e61c54755e7..ac994c68fec 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -54,6 +54,8 @@ namespace arrow { namespace internal { namespace tracing { +using internal::checked_cast; + namespace nostd = opentelemetry::nostd; namespace otel = opentelemetry; @@ -202,6 +204,18 @@ opentelemetry::trace::Tracer* GetTracer() { return tracer.get(); } +::arrow::util::tracing::Span NewSpan(const std::string& name) noexcept { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = GetTracer(); + OTSpan span; + span.Set(tracer->StartSpan(name)); + return *(checked_cast<::arrow::util::tracing::Span*>(&span)); +#else + ::arrow::util::tracing::Span span; + return span; +#endif +} + #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span) { diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 22b2abbc8d5..ee1a4c253e6 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -44,10 +44,47 @@ namespace arrow { namespace internal { namespace tracing { +::arrow::util::tracing::Span NewSpan(const std::string& name) noexcept; + #ifdef ARROW_WITH_OPENTELEMETRY ARROW_EXPORT opentelemetry::trace::Tracer* GetTracer(); +class OTSpan : public ::arrow::util::tracing::Span { + using InnerSpan = opentelemetry::nostd::shared_ptr; + + public: + OTSpan() { + inner_span = nullptr; + } + + InnerSpan& Set(const InnerSpan& span) noexcept { + inner_span = span; + return inner_span; + } + + InnerSpan& Set(InnerSpan&& span) noexcept { + inner_span = std::move(span); + return inner_span; + } + + const InnerSpan& Get() const { + ARROW_CHECK(inner_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return inner_span; + } + + InnerSpan& Get() { + ARROW_CHECK(inner_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return inner_span; + } + + InnerSpan inner_span; +}; + inline void MarkSpan(const Status& s, opentelemetry::trace::Span* span) { if (!s.ok()) { span->SetStatus(opentelemetry::trace::StatusCode::kError, s.ToString()); @@ -106,42 +143,11 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } -class OTSpan : ::arrow::util::tracing::Span { - using InnerSpan = opentelemetry::nostd::shared_ptr; - - public: - InnerSpan& Set(const InnerSpan& span) { - inner_span = span; - return inner_span; - } - - InnerSpan& Set(InnerSpan&& span) { - inner_span = std::move(span); - return inner_span; - } - - const InnerSpan& Get() const { - ARROW_CHECK(inner_span) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return inner_span; - } - - InnerSpan& Get() { - ARROW_CHECK(inner_span) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return inner_span; - } - - InnerSpan inner_span; -}; - opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span); #define CAST_SPAN(target_span) \ - (*(reinterpret_cast<::arrow::internal::tracing::OTSpan*>(&(target_span)))) + (*(::arrow::internal::checked_cast<::arrow::internal::tracing::OTSpan*>(&(target_span)))) #define START_SPAN(target_span, ...) \ auto opentelemetry_scope##__LINE__ = \ From 399e0d7aa7b936088795086f54dc1c6c30bacc08 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 8 Jun 2022 15:28:54 -0700 Subject: [PATCH 5/7] fix: Initialize OTSpan correctly --- cpp/src/arrow/compute/exec/aggregate_node.cc | 24 ++++++++++---------- cpp/src/arrow/compute/exec/exec_plan.cc | 4 +++- cpp/src/arrow/compute/exec/filter_node.cc | 4 ++-- cpp/src/arrow/compute/exec/hash_join_node.cc | 2 +- cpp/src/arrow/compute/exec/project_node.cc | 4 ++-- cpp/src/arrow/compute/exec/sink_node.cc | 8 +++---- cpp/src/arrow/compute/function.cc | 9 +------- cpp/src/arrow/dataset/file_base.cc | 4 ++-- cpp/src/arrow/util/tracing_internal.cc | 14 +----------- cpp/src/arrow/util/tracing_internal.h | 2 -- 10 files changed, 28 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index f6813ecb682..cc6d876ee2f 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -167,13 +167,13 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } Status DoConsume(const ExecBatch& batch, size_t thread_index) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Consume", {{"aggregate", ToStringExtra()}, {"node.label", label()}, {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -190,7 +190,7 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"aggregate", ToStringExtra()}, {"node.label", label()}, @@ -265,14 +265,14 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -396,7 +396,7 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } Status Consume(ExecBatch batch) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Consume", {{"group_by", ToStringExtra()}, {"node.label", label()}, @@ -422,7 +422,7 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -443,7 +443,7 @@ class GroupByNode : public ExecNode { } Status Merge() { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state0 = &local_states_[0]; @@ -458,7 +458,7 @@ class GroupByNode : public ExecNode { state->grouper.reset(); for (size_t i = 0; i < agg_kernels_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN( span, aggs_[i].function, {{"function.name", aggs_[i].function}, @@ -479,7 +479,7 @@ class GroupByNode : public ExecNode { } Result Finalize() { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Finalize", {{"group_by", ToStringExtra()}, {"node.label", label()}}); @@ -492,7 +492,7 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -554,7 +554,7 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"group_by", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 516b1b272e2..664a5e11cf0 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -46,7 +46,9 @@ namespace { struct ExecPlanImpl : public ExecPlan { explicit ExecPlanImpl(ExecContext* exec_context, std::shared_ptr metadata = NULLPTR) - : ExecPlan(exec_context), metadata_(std::move(metadata)) {} + : ExecPlan(exec_context), metadata_(std::move(metadata)) { + span_ = arrow::internal::tracing::OTSpan(); + } ~ExecPlanImpl() override { if (started_ && !finished_.is_finished()) { diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 0c849cb0435..4f04ce66fc5 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -69,7 +69,7 @@ class FilterNode : public MapNode { ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(filter_, target.guarantee)); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Filter", {{"filter.expression", ToStringExtra()}, {"filter.expression.simplified", simplified_filter.ToString()}, @@ -102,7 +102,7 @@ class FilterNode : public MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"filter", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index c8e6c6ffb2f..8336ccee2ab 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -527,7 +527,7 @@ class HashJoinNode : public ExecNode { int side = (input == inputs_[0]) ? 0 : 1; EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index b8fb64c5d54..bd5bc855744 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -79,7 +79,7 @@ class ProjectNode : public MapNode { Result DoProject(const ExecBatch& target) { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, "Project", {{"project.descr", exprs_[i].descr().ToString()}, {"project.length", target.length}, @@ -97,7 +97,7 @@ class ProjectNode : public MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"project", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index eae12bf7297..f93439e255d 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -187,7 +187,7 @@ class SinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -334,7 +334,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -516,7 +516,7 @@ struct OrderBySinkNode final : public SinkNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -555,7 +555,7 @@ struct OrderBySinkNode final : public SinkNode { } void Finish() override { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 80411d01dd1..abc4d8ce8ea 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -214,14 +214,7 @@ Result Function::Execute(const std::vector& args, return Execute(args, options, &default_ctx); } - -// #ifdef ARROW_WITH_OPENTELEMETRY -// auto tracer = arrow::internal::tracing::GetTracer(); -// auto span = tracer->StartSpan("arrow::compute::Function::Execute"); -// #else -// util::tracing::Span span; -// #endif - util::tracing::Span span = arrow::internal::tracing::NewSpan("arrow::compute::Function::Execute"); + arrow::internal::tracing::OTSpan span; START_COMPUTE_SPAN(span, name(), {{"function.name", name()}, diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2e05706bbbc..a45d15cd6fe 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -483,7 +483,7 @@ class TeeNode : public compute::MapNode { [this](std::shared_ptr next_batch, const PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.filename); if (!has_room.is_finished()) { @@ -499,7 +499,7 @@ class TeeNode : public compute::MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](compute::ExecBatch batch) { - util::tracing::Span span; + arrow::internal::tracing::OTSpan span; START_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"tee", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index ac994c68fec..57574d11b7a 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -204,23 +204,11 @@ opentelemetry::trace::Tracer* GetTracer() { return tracer.get(); } -::arrow::util::tracing::Span NewSpan(const std::string& name) noexcept { -#ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = GetTracer(); - OTSpan span; - span.Set(tracer->StartSpan(name)); - return *(checked_cast<::arrow::util::tracing::Span*>(&span)); -#else - ::arrow::util::tracing::Span span; - return span; -#endif -} - #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span) { opentelemetry::trace::StartSpanOptions options; - options.parent = ((const ::arrow::internal::tracing::OTSpan&)parent_span).Get()->GetContext(); + options.parent = (dynamic_cast(parent_span)).Get()->GetContext(); return options; } #endif diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index ee1a4c253e6..b6153869f62 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -44,8 +44,6 @@ namespace arrow { namespace internal { namespace tracing { -::arrow::util::tracing::Span NewSpan(const std::string& name) noexcept; - #ifdef ARROW_WITH_OPENTELEMETRY ARROW_EXPORT opentelemetry::trace::Tracer* GetTracer(); From c2f8faf626f3c231e14119fc89fe1dedf523cca5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 10 Jun 2022 17:37:44 -0700 Subject: [PATCH 6/7] feat: Implement with SpanDetail Co-authored-by: Weston Pace --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/exec/aggregate_node.cc | 24 ++--- cpp/src/arrow/compute/exec/exec_plan.cc | 10 +- cpp/src/arrow/compute/exec/filter_node.cc | 4 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 2 +- cpp/src/arrow/compute/exec/project_node.cc | 4 +- cpp/src/arrow/compute/exec/sink_node.cc | 8 +- cpp/src/arrow/compute/function.cc | 2 +- cpp/src/arrow/dataset/file_base.cc | 4 +- cpp/src/arrow/util/tracing.cc | 43 ++++++++ cpp/src/arrow/util/tracing.h | 11 +- cpp/src/arrow/util/tracing_internal.cc | 34 +++++-- cpp/src/arrow/util/tracing_internal.h | 102 ++++++++----------- 13 files changed, 152 insertions(+), 97 deletions(-) create mode 100644 cpp/src/arrow/util/tracing.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 6b6976234e9..fd2f10db2ff 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -222,6 +222,7 @@ set(ARROW_SRCS util/tdigest.cc util/thread_pool.cc util/time.cc + util/tracing.cc util/trie.cc util/unreachable.cc util/uri.cc diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index cc6d876ee2f..f6813ecb682 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -167,13 +167,13 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } Status DoConsume(const ExecBatch& batch, size_t thread_index) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", {{"aggregate", ToStringExtra()}, {"node.label", label()}, {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -190,7 +190,7 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"aggregate", ToStringExtra()}, {"node.label", label()}, @@ -265,14 +265,14 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -396,7 +396,7 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } Status Consume(ExecBatch batch) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", {{"group_by", ToStringExtra()}, {"node.label", label()}, @@ -422,7 +422,7 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -443,7 +443,7 @@ class GroupByNode : public ExecNode { } Status Merge() { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state0 = &local_states_[0]; @@ -458,7 +458,7 @@ class GroupByNode : public ExecNode { state->grouper.reset(); for (size_t i = 0; i < agg_kernels_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN( span, aggs_[i].function, {{"function.name", aggs_[i].function}, @@ -479,7 +479,7 @@ class GroupByNode : public ExecNode { } Result Finalize() { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Finalize", {{"group_by", ToStringExtra()}, {"node.label", label()}}); @@ -492,7 +492,7 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", @@ -554,7 +554,7 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"group_by", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 664a5e11cf0..3bd508ed9f4 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -46,9 +46,7 @@ namespace { struct ExecPlanImpl : public ExecPlan { explicit ExecPlanImpl(ExecContext* exec_context, std::shared_ptr metadata = NULLPTR) - : ExecPlan(exec_context), metadata_(std::move(metadata)) { - span_ = arrow::internal::tracing::OTSpan(); - } + : ExecPlan(exec_context), metadata_(std::move(metadata)) {} ~ExecPlanImpl() override { if (started_ && !finished_.is_finished()) { @@ -87,9 +85,11 @@ struct ExecPlanImpl : public ExecPlan { #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); + opentelemetry::nostd::shared_ptr span = + ::arrow::internal::tracing::UnwrapSpan(span_.details.get()); std::for_each(std::begin(pairs), std::end(pairs), - [this](std::pair const& pair) { - CAST_SPAN(span_).Get()->SetAttribute(pair.first, pair.second); + [span](std::pair const& pair) { + span->SetAttribute(pair.first, pair.second); }); } #endif diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 4f04ce66fc5..0c849cb0435 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -69,7 +69,7 @@ class FilterNode : public MapNode { ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(filter_, target.guarantee)); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Filter", {{"filter.expression", ToStringExtra()}, {"filter.expression.simplified", simplified_filter.ToString()}, @@ -102,7 +102,7 @@ class FilterNode : public MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"filter", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 8336ccee2ab..c8e6c6ffb2f 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -527,7 +527,7 @@ class HashJoinNode : public ExecNode { int side = (input == inputs_[0]) ? 0 : 1; EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index bd5bc855744..b8fb64c5d54 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -79,7 +79,7 @@ class ProjectNode : public MapNode { Result DoProject(const ExecBatch& target) { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, "Project", {{"project.descr", exprs_[i].descr().ToString()}, {"project.length", target.length}, @@ -97,7 +97,7 @@ class ProjectNode : public MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"project", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index f93439e255d..eae12bf7297 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -187,7 +187,7 @@ class SinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -334,7 +334,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -516,7 +516,7 @@ struct OrderBySinkNode final : public SinkNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", {{"node.label", label()}, {"batch.length", batch.length}}); @@ -555,7 +555,7 @@ struct OrderBySinkNode final : public SinkNode { } void Finish() override { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index abc4d8ce8ea..5ebbef9650c 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -214,7 +214,7 @@ Result Function::Execute(const std::vector& args, return Execute(args, options, &default_ctx); } - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_COMPUTE_SPAN(span, name(), {{"function.name", name()}, diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index a45d15cd6fe..2e05706bbbc 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -483,7 +483,7 @@ class TeeNode : public compute::MapNode { [this](std::shared_ptr next_batch, const PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.filename); if (!has_room.is_finished()) { @@ -499,7 +499,7 @@ class TeeNode : public compute::MapNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto func = [this](compute::ExecBatch batch) { - arrow::internal::tracing::OTSpan span; + util::tracing::Span span; START_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"tee", ToStringExtra()}, {"node.label", label()}, diff --git a/cpp/src/arrow/util/tracing.cc b/cpp/src/arrow/util/tracing.cc new file mode 100644 index 00000000000..9a534cf7303 --- /dev/null +++ b/cpp/src/arrow/util/tracing.cc @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/tracing.h" + +#include "arrow/util/config.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { + +using internal::make_unique; +namespace util { +namespace tracing { + +#ifdef ARROW_WITH_OPENTELEMETRY + +Span::Span() noexcept { details = make_unique<::arrow::internal::tracing::SpanImpl>(); } + +#else + +Span::Span() noexcept { /* details is left a nullptr */ +} + +#endif + +} // namespace tracing +} // namespace util +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/util/tracing.h b/cpp/src/arrow/util/tracing.h index e8f24cebd9f..c6968219b6e 100644 --- a/cpp/src/arrow/util/tracing.h +++ b/cpp/src/arrow/util/tracing.h @@ -19,16 +19,21 @@ #include -#include "arrow/util/logging.h" +#include "arrow/util/visibility.h" namespace arrow { namespace util { namespace tracing { +class ARROW_EXPORT SpanDetails { + public: + virtual ~SpanDetails() {} +}; + class ARROW_EXPORT Span { public: - // Manually specify destructor otherwise MSVC will complain about unused var - virtual ~Span() {} + Span() noexcept; + std::unique_ptr details; }; } // namespace tracing diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 57574d11b7a..ce49bb76025 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -54,8 +54,6 @@ namespace arrow { namespace internal { namespace tracing { -using internal::checked_cast; - namespace nostd = opentelemetry::nostd; namespace otel = opentelemetry; @@ -204,15 +202,39 @@ opentelemetry::trace::Tracer* GetTracer() { return tracer.get(); } -#ifdef ARROW_WITH_OPENTELEMETRY +opentelemetry::nostd::shared_ptr& UnwrapSpan( + ::arrow::util::tracing::SpanDetails* span) { + SpanImpl* span_impl = checked_cast(span); + ARROW_CHECK(span_impl->ot_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return span_impl->ot_span; +} + +const opentelemetry::nostd::shared_ptr& UnwrapSpan( + const ::arrow::util::tracing::SpanDetails* span) { + const SpanImpl* span_impl = checked_cast(span); + ARROW_CHECK(span_impl->ot_span) + << "Attempted to dereference a null pointer. Use Span::Set before " + "dereferencing."; + return span_impl->ot_span; +} + +opentelemetry::nostd::shared_ptr& RewrapSpan( + ::arrow::util::tracing::SpanDetails* span, + opentelemetry::nostd::shared_ptr ot_span) { + SpanImpl* span_impl = checked_cast(span); + span_impl->ot_span = std::move(ot_span); + return span_impl->ot_span; +} + opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span) { opentelemetry::trace::StartSpanOptions options; - options.parent = (dynamic_cast(parent_span)).Get()->GetContext(); + options.parent = UnwrapSpan(parent_span.details.get())->GetContext(); return options; } -#endif } // namespace tracing } // namespace internal -} // namespace arrow +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index b6153869f62..51306b7ff9c 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -48,41 +48,6 @@ namespace tracing { ARROW_EXPORT opentelemetry::trace::Tracer* GetTracer(); -class OTSpan : public ::arrow::util::tracing::Span { - using InnerSpan = opentelemetry::nostd::shared_ptr; - - public: - OTSpan() { - inner_span = nullptr; - } - - InnerSpan& Set(const InnerSpan& span) noexcept { - inner_span = span; - return inner_span; - } - - InnerSpan& Set(InnerSpan&& span) noexcept { - inner_span = std::move(span); - return inner_span; - } - - const InnerSpan& Get() const { - ARROW_CHECK(inner_span) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return inner_span; - } - - InnerSpan& Get() { - ARROW_CHECK(inner_span) - << "Attempted to dereference a null pointer. Use Span::Set before " - "dereferencing."; - return inner_span; - } - - InnerSpan inner_span; -}; - inline void MarkSpan(const Status& s, opentelemetry::trace::Span* span) { if (!s.ok()) { span->SetStatus(opentelemetry::trace::StatusCode::kError, s.ToString()); @@ -141,46 +106,63 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } +class SpanImpl : public ::arrow::util::tracing::SpanDetails { + public: + ~SpanImpl() override = default; + opentelemetry::nostd::shared_ptr ot_span; +}; + +opentelemetry::nostd::shared_ptr& UnwrapSpan( + ::arrow::util::tracing::SpanDetails* span); + +const opentelemetry::nostd::shared_ptr& UnwrapSpan( + const ::arrow::util::tracing::SpanDetails* span); + +opentelemetry::nostd::shared_ptr& RewrapSpan( + ::arrow::util::tracing::SpanDetails* span, + opentelemetry::nostd::shared_ptr ot_span); + opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( const util::tracing::Span& parent_span); -#define CAST_SPAN(target_span) \ - (*(::arrow::internal::checked_cast<::arrow::internal::tracing::OTSpan*>(&(target_span)))) - #define START_SPAN(target_span, ...) \ auto opentelemetry_scope##__LINE__ = \ ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ - CAST_SPAN(target_span) \ - .Set(::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__))) - -#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ - auto opentelemetry_scope##__LINE__ = \ - ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ - CAST_SPAN(target_span) \ - .Set(::arrow::internal::tracing::GetTracer()->StartSpan( \ - __VA_ARGS__, \ + ::arrow::internal::tracing::RewrapSpan( \ + target_span.details.get(), \ + ::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__))) + +#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \ + auto opentelemetry_scope##__LINE__ = \ + ::arrow::internal::tracing::GetTracer()->WithActiveSpan( \ + ::arrow::internal::tracing::RewrapSpan( \ + target_span.details.get(), \ + \ + ::arrow::internal::tracing::GetTracer()->StartSpan( \ + __VA_ARGS__, \ ::arrow::internal::tracing::SpanOptionsWithParent(parent_span)))) -#define START_COMPUTE_SPAN(target_span, ...) \ - START_SPAN(target_span, __VA_ARGS__); \ - CAST_SPAN(target_span) \ - .Get() \ - ->SetAttribute("arrow.memory_pool_bytes", \ +#define START_COMPUTE_SPAN(target_span, ...) \ + START_SPAN(target_span, __VA_ARGS__); \ + ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()) \ + ->SetAttribute("arrow.memory_pool_bytes", \ ::arrow::default_memory_pool()->bytes_allocated()) #define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \ START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \ - CAST_SPAN(target_span) \ - .Get() \ + ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()) \ ->SetAttribute("arrow.memory_pool_bytes", \ ::arrow::default_memory_pool()->bytes_allocated()) -#define EVENT(target_span, ...) CAST_SPAN(target_span).Get()->AddEvent(__VA_ARGS__) +#define EVENT(target_span, ...) \ + ::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->AddEvent(__VA_ARGS__) -#define MARK_SPAN(target_span, status) \ - ::arrow::internal::tracing::MarkSpan(status, CAST_SPAN(target_span).Get().get()) +#define MARK_SPAN(target_span, status) \ + ::arrow::internal::tracing::MarkSpan( \ + status, ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()).get()) -#define END_SPAN(target_span) CAST_SPAN(target_span).Get()->End() +#define END_SPAN(target_span) \ + ::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->End() #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) \ target_future = target_future.Then( \ @@ -216,6 +198,8 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #else // !ARROW_WITH_OPENTELEMETRY +class SpanImpl {}; + #define START_SPAN(target_span, ...) #define START_SPAN_WITH_PARENT(target_span, parent_span, ...) #define START_COMPUTE_SPAN(target_span, ...) @@ -232,4 +216,4 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( } // namespace tracing } // namespace internal -} // namespace arrow +} // namespace arrow \ No newline at end of file From 539529ea3e2dda546dcbe92d66775eb8535d00e9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 10 Jun 2022 17:56:07 -0700 Subject: [PATCH 7/7] fix: formatting --- cpp/src/arrow/compute/exec/exec_plan.cc | 4 ++-- cpp/src/arrow/compute/function.cc | 2 +- cpp/src/arrow/util/tracing.cc | 2 +- cpp/src/arrow/util/tracing_internal.cc | 2 +- cpp/src/arrow/util/tracing_internal.h | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 3bd508ed9f4..a23d6d5bae6 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -85,8 +85,8 @@ struct ExecPlanImpl : public ExecPlan { #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); - opentelemetry::nostd::shared_ptr span = - ::arrow::internal::tracing::UnwrapSpan(span_.details.get()); + opentelemetry::nostd::shared_ptr span = + ::arrow::internal::tracing::UnwrapSpan(span_.details.get()); std::for_each(std::begin(pairs), std::end(pairs), [span](std::pair const& pair) { span->SetAttribute(pair.first, pair.second); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 5ebbef9650c..fd80de4dbb3 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -215,7 +215,7 @@ Result Function::Execute(const std::vector& args, } util::tracing::Span span; - + START_COMPUTE_SPAN(span, name(), {{"function.name", name()}, {"function.options", options ? options->ToString() : ""}, diff --git a/cpp/src/arrow/util/tracing.cc b/cpp/src/arrow/util/tracing.cc index 9a534cf7303..8bf21f688c4 100644 --- a/cpp/src/arrow/util/tracing.cc +++ b/cpp/src/arrow/util/tracing.cc @@ -40,4 +40,4 @@ Span::Span() noexcept { /* details is left a nullptr */ } // namespace tracing } // namespace util -} // namespace arrow \ No newline at end of file +} // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index ce49bb76025..668a2aaba8b 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -237,4 +237,4 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( } // namespace tracing } // namespace internal -} // namespace arrow \ No newline at end of file +} // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 51306b7ff9c..2898fd245fb 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -216,4 +216,4 @@ class SpanImpl {}; } // namespace tracing } // namespace internal -} // namespace arrow \ No newline at end of file +} // namespace arrow