Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ add_library(
stats/thread_local_store.cc
thread_local/thread_local_impl.cc
tracing/http_tracer_impl.cc
tracing/lightstep_tracer_impl.cc
upstream/cds_api_impl.cc
upstream/cluster_manager_impl.cc
upstream/health_checker_impl.cc
Expand Down
10 changes: 8 additions & 2 deletions source/common/tracing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ load("//bazel:envoy_build_system.bzl", "envoy_cc_library")

envoy_cc_library(
name = "http_tracer_lib",
srcs = ["http_tracer_impl.cc"],
hdrs = ["http_tracer_impl.h"],
srcs = [
"http_tracer_impl.cc",
"lightstep_tracer_impl.cc",
],
hdrs = [
"http_tracer_impl.h",
"lightstep_tracer_impl.h",
],
external_deps = ["lightstep"],
deps = [
"//include/envoy/local_info:local_info_interface",
Expand Down
157 changes: 0 additions & 157 deletions source/common/tracing/http_tracer_impl.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#include "common/tracing/http_tracer_impl.h"

#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/common/macros.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
#include "common/http/access_log/access_log_formatter.h"
#include "common/http/codes.h"
#include "common/http/headers.h"
#include "common/http/header_map_impl.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"
#include "common/runtime/uuid_util.h"

Expand Down Expand Up @@ -175,158 +172,4 @@ SpanPtr HttpTracerImpl::startSpan(const Config& config, Http::HeaderMap& request
return active_span;
}

LightStepSpan::LightStepSpan(lightstep::Span& span) : span_(span) {}

void LightStepSpan::finishSpan() { span_.Finish(); }

void LightStepSpan::setTag(const std::string& name, const std::string& value) {
span_.SetTag(name, value);
}

LightStepRecorder::LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepDriver& driver,
Event::Dispatcher& dispatcher)
: builder_(tracer), driver_(driver) {
flush_timer_ = dispatcher.createTimer([this]() -> void {
driver_.tracerStats().timer_flushed_.inc();
flushSpans();
enableTimer();
});

enableTimer();
}

void LightStepRecorder::RecordSpan(lightstep::collector::Span&& span) {
builder_.addSpan(std::move(span));

uint64_t min_flush_spans =
driver_.runtime().snapshot().getInteger("tracing.lightstep.min_flush_spans", 5U);
if (builder_.pendingSpans() == min_flush_spans) {
flushSpans();
}
}

bool LightStepRecorder::FlushWithTimeout(lightstep::Duration) {
// Note: We don't expect this to be called, since the Tracer
// reference is private to its LightStepSink.
return true;
}

std::unique_ptr<lightstep::Recorder>
LightStepRecorder::NewInstance(LightStepDriver& driver, Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer) {
return std::unique_ptr<lightstep::Recorder>(new LightStepRecorder(tracer, driver, dispatcher));
}

void LightStepRecorder::enableTimer() {
uint64_t flush_interval =
driver_.runtime().snapshot().getInteger("tracing.lightstep.flush_interval_ms", 1000U);
flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
}

void LightStepRecorder::flushSpans() {
if (builder_.pendingSpans() != 0) {
driver_.tracerStats().spans_sent_.add(builder_.pendingSpans());
lightstep::collector::ReportRequest request;
std::swap(request, builder_.pending());

Http::MessagePtr message = Grpc::Common::prepareHeaders(driver_.cluster()->name(),
lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName());

message->body() = Grpc::Common::serializeBody(std::move(request));

uint64_t timeout =
driver_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U);
driver_.clusterManager()
.httpAsyncClientForCluster(driver_.cluster()->name())
.send(std::move(message), *this, std::chrono::milliseconds(timeout));
}
}

LightStepDriver::TlsLightStepTracer::TlsLightStepTracer(lightstep::Tracer tracer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, can you also move LS related tests (driver/etc) into separate test files?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. LMK if anything else needs to be refactored.

LightStepDriver& driver)
: tracer_(tracer), driver_(driver) {}

LightStepDriver::LightStepDriver(const Json::Object& config,
Upstream::ClusterManager& cluster_manager, Stats::Store& stats,
ThreadLocal::Instance& tls, Runtime::Loader& runtime,
std::unique_ptr<lightstep::TracerOptions> options)
: cm_(cluster_manager),
tracer_stats_{LIGHTSTEP_TRACER_STATS(POOL_COUNTER_PREFIX(stats, "tracing.lightstep."))},
tls_(tls), runtime_(runtime), options_(std::move(options)), tls_slot_(tls.allocateSlot()) {
Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster"));
if (!cluster) {
throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level",
config.getString("collector_cluster")));
}
cluster_ = cluster->info();

if (!(cluster_->features() & Upstream::ClusterInfo::Features::HTTP2)) {
throw EnvoyException(
fmt::format("{} collector cluster must support http2 for gRPC calls", cluster_->name()));
}

tls_.set(tls_slot_,
[this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
lightstep::Tracer tracer(lightstep::NewUserDefinedTransportLightStepTracer(
*options_, std::bind(&LightStepRecorder::NewInstance, std::ref(*this),
std::ref(dispatcher), std::placeholders::_1)));

return ThreadLocal::ThreadLocalObjectSharedPtr{
new TlsLightStepTracer(std::move(tracer), *this)};
});
}

SpanPtr LightStepDriver::startSpan(Http::HeaderMap& request_headers,
const std::string& operation_name, SystemTime start_time) {
lightstep::Tracer& tracer = tls_.getTyped<TlsLightStepTracer>(tls_slot_).tracer_;
LightStepSpanPtr active_span;

if (request_headers.OtSpanContext()) {
// Extract downstream context from HTTP carrier.
// This code is safe even if decode returns empty string or data is malformed.
std::string parent_context = Base64::decode(request_headers.OtSpanContext()->value().c_str());
lightstep::BinaryCarrier ctx;
ctx.ParseFromString(parent_context);

lightstep::SpanContext parent_span_ctx = tracer.Extract(
lightstep::CarrierFormat::LightStepBinaryCarrier, lightstep::ProtoReader(ctx));
lightstep::Span ls_span =
tracer.StartSpan(operation_name, {lightstep::ChildOf(parent_span_ctx),
lightstep::StartTimestamp(start_time)});
active_span.reset(new LightStepSpan(ls_span));
} else {
lightstep::Span ls_span =
tracer.StartSpan(operation_name, {lightstep::StartTimestamp(start_time)});
active_span.reset(new LightStepSpan(ls_span));
}

// Inject newly created span context into HTTP carrier.
lightstep::BinaryCarrier ctx;
tracer.Inject(active_span->context(), lightstep::CarrierFormat::LightStepBinaryCarrier,
lightstep::ProtoWriter(&ctx));
const std::string current_span_context = ctx.SerializeAsString();
request_headers.insertOtSpanContext().value(
Base64::encode(current_span_context.c_str(), current_span_context.length()));

return std::move(active_span);
}

void LightStepRecorder::onFailure(Http::AsyncClient::FailureReason) {
Grpc::Common::chargeStat(*driver_.cluster(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), false);
}

void LightStepRecorder::onSuccess(Http::MessagePtr&& msg) {
try {
Grpc::Common::validateResponse(*msg);

Grpc::Common::chargeStat(*driver_.cluster(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), true);
} catch (const Grpc::Exception& ex) {
Grpc::Common::chargeStat(*driver_.cluster(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), false);
}
}

} // Tracing
93 changes: 0 additions & 93 deletions source/common/tracing/http_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,8 @@
#include "common/http/header_map_impl.h"
#include "common/json/json_loader.h"

#include "lightstep/carrier.h"
#include "lightstep/tracer.h"

namespace Tracing {

#define LIGHTSTEP_TRACER_STATS(COUNTER) \
COUNTER(spans_sent) \
COUNTER(timer_flushed)

struct LightstepTracerStats {
LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
};

enum class Reason {
NotTraceableRequestId,
HealthCheck,
Expand Down Expand Up @@ -91,86 +80,4 @@ class HttpTracerImpl : public HttpTracer {
const LocalInfo::LocalInfo& local_info_;
};

class LightStepSpan : public Span {
public:
LightStepSpan(lightstep::Span& span);

// Tracing::Span
void finishSpan() override;
void setTag(const std::string& name, const std::string& value) override;

lightstep::SpanContext context() { return span_.context(); }

private:
lightstep::Span span_;
};

typedef std::unique_ptr<LightStepSpan> LightStepSpanPtr;

/**
* LightStep (http://lightstep.com/) provides tracing capabilities, aggregation, visualization of
* application trace data.
*
* LightStepSink is for flushing data to LightStep collectors.
*/
class LightStepDriver : public Driver {
public:
LightStepDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime,
std::unique_ptr<lightstep::TracerOptions> options);

// Tracer::TracingDriver
SpanPtr startSpan(Http::HeaderMap& request_headers, const std::string& operation_name,
SystemTime start_time) override;

Upstream::ClusterManager& clusterManager() { return cm_; }
Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; }
Runtime::Loader& runtime() { return runtime_; }
LightstepTracerStats& tracerStats() { return tracer_stats_; }

private:
struct TlsLightStepTracer : ThreadLocal::ThreadLocalObject {
TlsLightStepTracer(lightstep::Tracer tracer, LightStepDriver& driver);

void shutdown() override {}

lightstep::Tracer tracer_;
LightStepDriver& driver_;
};

Upstream::ClusterManager& cm_;
Upstream::ClusterInfoConstSharedPtr cluster_;
LightstepTracerStats tracer_stats_;
ThreadLocal::Instance& tls_;
Runtime::Loader& runtime_;
std::unique_ptr<lightstep::TracerOptions> options_;
uint32_t tls_slot_;
};

class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbacks {
public:
LightStepRecorder(const lightstep::TracerImpl& tracer, LightStepDriver& driver,
Event::Dispatcher& dispatcher);

// lightstep::Recorder
void RecordSpan(lightstep::collector::Span&& span) override;
bool FlushWithTimeout(lightstep::Duration) override;

// Http::AsyncClient::Callbacks
void onSuccess(Http::MessagePtr&&) override;
void onFailure(Http::AsyncClient::FailureReason) override;

static std::unique_ptr<lightstep::Recorder> NewInstance(LightStepDriver& driver,
Event::Dispatcher& dispatcher,
const lightstep::TracerImpl& tracer);

private:
void enableTimer();
void flushSpans();

lightstep::ReportBuilder builder_;
LightStepDriver& driver_;
Event::TimerPtr flush_timer_;
};

} // Tracing
Loading