Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cpp/cmake_modules/FindThrift.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,17 @@ endfunction(EXTRACT_THRIFT_VERSION)
if(MSVC_TOOLCHAIN AND NOT DEFINED THRIFT_MSVC_LIB_SUFFIX)
if(NOT ARROW_THRIFT_USE_SHARED)
if(ARROW_USE_STATIC_CRT)
set(THRIFT_MSVC_LIB_SUFFIX "mt")
if("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
set(THRIFT_MSVC_LIB_SUFFIX "mtd")
else()
set(THRIFT_MSVC_LIB_SUFFIX "mt")
endif()
else()
set(THRIFT_MSVC_LIB_SUFFIX "md")
if("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
set(THRIFT_MSVC_LIB_SUFFIX "mdd")
else()
set(THRIFT_MSVC_LIB_SUFFIX "md")
endif()
endif()
endif()
endif()
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ struct ExecPlanImpl : public ExecPlan {
#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
auto pairs = metadata().get()->sorted_pairs();
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
::arrow::internal::tracing::UnwrapSpan(span_.details.get());
std::for_each(std::begin(pairs), std::end(pairs),
[this](std::pair<std::string, std::string> const& pair) {
span_.Get().span->SetAttribute(pair.first, pair.second);
[span](std::pair<std::string, std::string> const& pair) {
span->SetAttribute(pair.first, pair.second);
});
}
#endif
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/kernels/row_encoder.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {
namespace compute {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/hash_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/tracing.h"

namespace arrow {
namespace compute {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/util/future.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
}

util::tracing::Span span;

START_COMPUTE_SPAN(span, name(),
{{"function.name", name()},
{"function.options", options ? options->ToString() : "<NULLPTR>"},
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/compute/light_array.h"

#include <gtest/gtest.h>
#include <numeric>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this change. I had noticed it myself but hadn't gotten around to making a PR.


#include "arrow/compute/exec/test_util.h"
#include "arrow/testing/generator.h"
Expand Down
18 changes: 8 additions & 10 deletions cpp/src/arrow/util/tracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,28 @@
// under the License.

#include "arrow/util/tracing.h"

#include "arrow/util/config.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

using internal::make_unique;
namespace util {
namespace tracing {

#ifdef ARROW_WITH_OPENTELEMETRY

Span::Impl& Span::Set(const Impl& impl) {
inner_impl.reset(new Impl(impl));
return *inner_impl;
}
Span::Span() noexcept { details = make_unique<::arrow::internal::tracing::SpanImpl>(); }

Span::Impl& Span::Set(Impl&& impl) {
inner_impl.reset(new Impl(std::move(impl)));
return *inner_impl;
#else

Span::Span() noexcept { /* details is left a nullptr */
}

#endif

// Default destructor when impl type is complete.
Span::~Span() = default;

} // namespace tracing
} // namespace util
} // namespace arrow
44 changes: 8 additions & 36 deletions cpp/src/arrow/util/tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,21 @@

#include <memory>

#include "arrow/util/logging.h"
#include "arrow/util/visibility.h"

namespace arrow {

namespace internal {
namespace tracing {

// Forward declaration SpanImpl.
class SpanImpl;

} // namespace tracing
} // namespace internal

namespace util {
namespace tracing {

class ARROW_EXPORT Span {
class ARROW_EXPORT SpanDetails {
public:
using Impl = arrow::internal::tracing::SpanImpl;

Span() = default; // Default constructor. The inner_impl is a nullptr.
~Span(); // Destructor. Default destructor defined in tracing.cc where impl is a
// complete type.

Impl& Set(const Impl&);
Impl& Set(Impl&&);

const Impl& Get() const {
ARROW_CHECK(inner_impl)
<< "Attempted to dereference a null pointer. Use Span::Set before "
"dereferencing.";
return *inner_impl;
}

Impl& Get() {
ARROW_CHECK(inner_impl)
<< "Attempted to dereference a null pointer. Use Span::Set before "
"dereferencing.";
return *inner_impl;
}
virtual ~SpanDetails() {}
};

private:
std::unique_ptr<Impl> inner_impl;
class ARROW_EXPORT Span {
public:
Span() noexcept;
std::unique_ptr<SpanDetails> details;
};

} // namespace tracing
Expand Down
30 changes: 27 additions & 3 deletions cpp/src/arrow/util/tracing_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,38 @@ opentelemetry::trace::Tracer* GetTracer() {
return tracer.get();
}

#ifdef ARROW_WITH_OPENTELEMETRY
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& UnwrapSpan(
::arrow::util::tracing::SpanDetails* span) {
SpanImpl* span_impl = checked_cast<SpanImpl*>(span);
ARROW_CHECK(span_impl->ot_span)
<< "Attempted to dereference a null pointer. Use Span::Set before "
"dereferencing.";
return span_impl->ot_span;
}

const opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& UnwrapSpan(
const ::arrow::util::tracing::SpanDetails* span) {
const SpanImpl* span_impl = checked_cast<const SpanImpl*>(span);
ARROW_CHECK(span_impl->ot_span)
<< "Attempted to dereference a null pointer. Use Span::Set before "
"dereferencing.";
return span_impl->ot_span;
}

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& RewrapSpan(
::arrow::util::tracing::SpanDetails* span,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> ot_span) {
SpanImpl* span_impl = checked_cast<SpanImpl*>(span);
span_impl->ot_span = std::move(ot_span);
return span_impl->ot_span;
}

opentelemetry::trace::StartSpanOptions SpanOptionsWithParent(
const util::tracing::Span& parent_span) {
opentelemetry::trace::StartSpanOptions options;
options.parent = parent_span.Get().span->GetContext();
options.parent = UnwrapSpan(parent_span.details.get())->GetContext();
return options;
}
#endif

} // namespace tracing
} // namespace internal
Expand Down
75 changes: 45 additions & 30 deletions cpp/src/arrow/util/tracing_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,48 +106,63 @@ AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(AsyncGenerator<T> wrapped)
return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span));
}

class SpanImpl {
class SpanImpl : public ::arrow::util::tracing::SpanDetails {
public:
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span;
~SpanImpl() override = default;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> ot_span;
};

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& UnwrapSpan(
::arrow::util::tracing::SpanDetails* span);

const opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& UnwrapSpan(
const ::arrow::util::tracing::SpanDetails* span);

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>& RewrapSpan(
::arrow::util::tracing::SpanDetails* span,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> ot_span);

opentelemetry::trace::StartSpanOptions SpanOptionsWithParent(
const util::tracing::Span& parent_span);

#define START_SPAN(target_span, ...) \
auto opentelemetry_scope##__LINE__ = \
::arrow::internal::tracing::GetTracer()->WithActiveSpan( \
target_span \
.Set(::arrow::util::tracing::Span::Impl{ \
::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__)}) \
.span)

#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \
auto opentelemetry_scope##__LINE__ = \
::arrow::internal::tracing::GetTracer()->WithActiveSpan( \
target_span \
.Set(::arrow::util::tracing::Span::Impl{ \
::arrow::internal::tracing::GetTracer()->StartSpan( \
__VA_ARGS__, \
::arrow::internal::tracing::SpanOptionsWithParent(parent_span))}) \
.span)

#define START_COMPUTE_SPAN(target_span, ...) \
START_SPAN(target_span, __VA_ARGS__); \
target_span.Get().span->SetAttribute( \
"arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated())
#define START_SPAN(target_span, ...) \
auto opentelemetry_scope##__LINE__ = \
::arrow::internal::tracing::GetTracer()->WithActiveSpan( \
::arrow::internal::tracing::RewrapSpan( \
target_span.details.get(), \
::arrow::internal::tracing::GetTracer()->StartSpan(__VA_ARGS__)))

#define START_SPAN_WITH_PARENT(target_span, parent_span, ...) \
auto opentelemetry_scope##__LINE__ = \
::arrow::internal::tracing::GetTracer()->WithActiveSpan( \
::arrow::internal::tracing::RewrapSpan( \
target_span.details.get(), \
\
::arrow::internal::tracing::GetTracer()->StartSpan( \
__VA_ARGS__, \
::arrow::internal::tracing::SpanOptionsWithParent(parent_span))))

#define START_COMPUTE_SPAN(target_span, ...) \
START_SPAN(target_span, __VA_ARGS__); \
::arrow::internal::tracing::UnwrapSpan(target_span.details.get()) \
->SetAttribute("arrow.memory_pool_bytes", \
::arrow::default_memory_pool()->bytes_allocated())

#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \
START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \
target_span.Get().span->SetAttribute( \
"arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated())
::arrow::internal::tracing::UnwrapSpan(target_span.details.get()) \
->SetAttribute("arrow.memory_pool_bytes", \
::arrow::default_memory_pool()->bytes_allocated())

#define EVENT(target_span, ...) target_span.Get().span->AddEvent(__VA_ARGS__)
#define EVENT(target_span, ...) \
::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->AddEvent(__VA_ARGS__)

#define MARK_SPAN(target_span, status) \
::arrow::internal::tracing::MarkSpan(status, target_span.Get().span.get())
#define MARK_SPAN(target_span, status) \
::arrow::internal::tracing::MarkSpan( \
status, ::arrow::internal::tracing::UnwrapSpan(target_span.details.get()).get())

#define END_SPAN(target_span) target_span.Get().span->End()
#define END_SPAN(target_span) \
::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->End()

#define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) \
target_future = target_future.Then( \
Expand Down