diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0262357d6c7..0e7b7b79a9f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -766,9 +766,15 @@ if(ARROW_S3) endif() if(ARROW_WITH_OPENTELEMETRY) - list(APPEND ARROW_LINK_LIBS opentelemetry-cpp::trace + list(APPEND + ARROW_LINK_LIBS + opentelemetry-cpp::trace + opentelemetry-cpp::ostream_span_exporter opentelemetry-cpp::otlp_http_exporter) - list(APPEND ARROW_STATIC_LINK_LIBS opentelemetry-cpp::trace + list(APPEND + ARROW_STATIC_LINK_LIBS + opentelemetry-cpp::trace + opentelemetry-cpp::ostream_span_exporter opentelemetry-cpp::otlp_http_exporter) endif() diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index fe6422eaa35..d39f95061c7 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -25,13 +25,20 @@ #pragma warning(push) #pragma warning(disable : 4522) #endif +#include + +#include #include +#include #include #include #include #include -#include #include + +#include +#include +#include #ifdef _MSC_VER #pragma warning(pop) #endif @@ -53,6 +60,46 @@ namespace { namespace sdktrace = opentelemetry::sdk::trace; +// Custom JSON exporter. Leverages the OTLP HTTP exporter's utilities +// to log the same format that would be sent to OTLP. +class OtlpOStreamExporter final : public sdktrace::SpanExporter { + public: + explicit OtlpOStreamExporter(std::basic_ostream* out) : out_(out) { + protobuf_json_options_.add_whitespace = false; + } + + std::unique_ptr MakeRecordable() noexcept override { + // The header for the Recordable definition is not installed, work around that + return exporter_.MakeRecordable(); + } + otel::sdk::common::ExportResult Export( + const nostd::span>& spans) noexcept override { + opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request; + otel::exporter::otlp::OtlpRecordableUtils::PopulateRequest(spans, &request); + + for (const auto& spans : request.resource_spans()) { + std::string output; + auto status = google::protobuf::util::MessageToJsonString(spans, &output, + protobuf_json_options_); + if (ARROW_PREDICT_FALSE(!status.ok())) { + return otel::sdk::common::ExportResult::kFailure; + } + (*out_) << output << std::endl; + } + + return otel::sdk::common::ExportResult::kSuccess; + } + bool Shutdown(std::chrono::microseconds timeout = + std::chrono::microseconds(0)) noexcept override { + return exporter_.Shutdown(timeout); + } + + private: + std::basic_ostream* out_; + opentelemetry::exporter::otlp::OtlpHttpExporter exporter_; + google::protobuf::util::JsonPrintOptions protobuf_json_options_; +}; + class ThreadIdSpanProcessor : public sdktrace::BatchSpanProcessor { public: using sdktrace::BatchSpanProcessor::BatchSpanProcessor; @@ -68,15 +115,16 @@ std::unique_ptr InitializeExporter() { auto maybe_env_var = arrow::internal::GetEnvVar(kTracingBackendEnvVar); if (maybe_env_var.ok()) { auto env_var = maybe_env_var.ValueOrDie(); - if (env_var == "otlp_http") { -#ifdef ARROW_WITH_OPENTELEMETRY + if (env_var == "ostream") { + return arrow::internal::make_unique(); + } else if (env_var == "otlp_http") { namespace otlp = opentelemetry::exporter::otlp; otlp::OtlpHttpExporterOptions opts; return arrow::internal::make_unique(opts); -#else - ARROW_LOG(WARNING) << "Requested " << kTracingBackendEnvVar << "=" < < < < env_var - " but Arrow was not built with ARROW_WITH_OPENTELEMETRY"; -#endif + } else if (env_var == "arrow_otlp_stdout") { + return arrow::internal::make_unique(&std::cout); + } else if (env_var == "arrow_otlp_stderr") { + return arrow::internal::make_unique(&std::cerr); } else if (!env_var.empty()) { ARROW_LOG(WARNING) << "Requested unknown backend " << kTracingBackendEnvVar << "=" << env_var;