Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,15 @@ if(ARROW_S3)
endif()

if(ARROW_WITH_OPENTELEMETRY)
list(APPEND ARROW_LINK_LIBS opentelemetry-cpp::trace
list(APPEND
ARROW_LINK_LIBS
opentelemetry-cpp::trace
opentelemetry-cpp::ostream_span_exporter
opentelemetry-cpp::otlp_http_exporter)
list(APPEND ARROW_STATIC_LINK_LIBS opentelemetry-cpp::trace
list(APPEND
ARROW_STATIC_LINK_LIBS
opentelemetry-cpp::trace
opentelemetry-cpp::ostream_span_exporter
opentelemetry-cpp::otlp_http_exporter)
endif()

Expand Down
62 changes: 55 additions & 7 deletions cpp/src/arrow/util/tracing_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
#pragma warning(push)
#pragma warning(disable : 4522)
#endif
#include <google/protobuf/util/json_util.h>

#include <opentelemetry/exporters/ostream/span_exporter.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter.h>
#include <opentelemetry/exporters/otlp/otlp_recordable_utils.h>
#include <opentelemetry/sdk/trace/batch_span_processor.h>
#include <opentelemetry/sdk/trace/recordable.h>
#include <opentelemetry/sdk/trace/span_data.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/trace/noop.h>
#include <opentelemetry/trace/provider.h>

#include <opentelemetry/exporters/otlp/protobuf_include_prefix.h>
#include <opentelemetry/exporters/otlp/protobuf_include_suffix.h>
#include <opentelemetry/proto/collector/trace/v1/trace_service.pb.h>
#ifdef _MSC_VER
#pragma warning(pop)
#endif
Expand All @@ -53,6 +60,46 @@ namespace {

namespace sdktrace = opentelemetry::sdk::trace;

// Custom JSON exporter. Leverages the OTLP HTTP exporter's utilities
// to log the same format that would be sent to OTLP.
class OtlpOStreamExporter final : public sdktrace::SpanExporter {
public:
explicit OtlpOStreamExporter(std::basic_ostream<char>* out) : out_(out) {
protobuf_json_options_.add_whitespace = false;
}

std::unique_ptr<sdktrace::Recordable> MakeRecordable() noexcept override {
// The header for the Recordable definition is not installed, work around that
return exporter_.MakeRecordable();
}
otel::sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdktrace::Recordable>>& spans) noexcept override {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request;
otel::exporter::otlp::OtlpRecordableUtils::PopulateRequest(spans, &request);

for (const auto& spans : request.resource_spans()) {
std::string output;
auto status = google::protobuf::util::MessageToJsonString(spans, &output,
protobuf_json_options_);
if (ARROW_PREDICT_FALSE(!status.ok())) {
return otel::sdk::common::ExportResult::kFailure;
}
(*out_) << output << std::endl;
}

return otel::sdk::common::ExportResult::kSuccess;
}
bool Shutdown(std::chrono::microseconds timeout =
std::chrono::microseconds(0)) noexcept override {
return exporter_.Shutdown(timeout);
}

private:
std::basic_ostream<char>* out_;
opentelemetry::exporter::otlp::OtlpHttpExporter exporter_;
google::protobuf::util::JsonPrintOptions protobuf_json_options_;
};

class ThreadIdSpanProcessor : public sdktrace::BatchSpanProcessor {
public:
using sdktrace::BatchSpanProcessor::BatchSpanProcessor;
Expand All @@ -68,15 +115,16 @@ std::unique_ptr<sdktrace::SpanExporter> InitializeExporter() {
auto maybe_env_var = arrow::internal::GetEnvVar(kTracingBackendEnvVar);
if (maybe_env_var.ok()) {
auto env_var = maybe_env_var.ValueOrDie();
if (env_var == "otlp_http") {
#ifdef ARROW_WITH_OPENTELEMETRY
if (env_var == "ostream") {
return arrow::internal::make_unique<otel::exporter::trace::OStreamSpanExporter>();
} else if (env_var == "otlp_http") {
namespace otlp = opentelemetry::exporter::otlp;
otlp::OtlpHttpExporterOptions opts;
return arrow::internal::make_unique<otlp::OtlpHttpExporter>(opts);
#else
ARROW_LOG(WARNING) << "Requested " << kTracingBackendEnvVar << "=" < < < < env_var
" but Arrow was not built with ARROW_WITH_OPENTELEMETRY";
#endif
} else if (env_var == "arrow_otlp_stdout") {
return arrow::internal::make_unique<OtlpOStreamExporter>(&std::cout);
} else if (env_var == "arrow_otlp_stderr") {
return arrow::internal::make_unique<OtlpOStreamExporter>(&std::cerr);
} else if (!env_var.empty()) {
ARROW_LOG(WARNING) << "Requested unknown backend " << kTracingBackendEnvVar << "="
<< env_var;
Expand Down