From 73b98ddabd87d95cdc71c576f546002633f7ffd1 Mon Sep 17 00:00:00 2001 From: "FABIO A. OLIVEIRA" Date: Wed, 5 Apr 2017 11:29:07 -0400 Subject: [PATCH 1/7] Reapplied changes reverted from PR 692. Use the Zipkin library. (#430) --- include/envoy/http/header_map.h | 7 +- source/CMakeLists.txt | 1 + source/common/http/conn_manager_impl.cc | 1 + source/common/http/headers.h | 5 + source/common/json/config_schemas.cc | 7 +- source/common/tracing/http_tracer_impl.cc | 194 ++++++++++++++++++++++ source/common/tracing/http_tracer_impl.h | 70 ++++++++ source/exe/CMakeLists.txt | 2 +- source/server/configuration_impl.cc | 12 ++ test/CMakeLists.txt | 1 + 10 files changed, 295 insertions(+), 5 deletions(-) diff --git a/include/envoy/http/header_map.h b/include/envoy/http/header_map.h index 42658f2c87ad5..2c7ff3312c6e8 100644 --- a/include/envoy/http/header_map.h +++ b/include/envoy/http/header_map.h @@ -220,7 +220,12 @@ class HeaderEntry { HEADER_FUNC(Status) \ HEADER_FUNC(TransferEncoding) \ HEADER_FUNC(Upgrade) \ - HEADER_FUNC(UserAgent) + HEADER_FUNC(UserAgent) \ + HEADER_FUNC(XB3TraceId) \ + HEADER_FUNC(XB3SpanId) \ + HEADER_FUNC(XB3ParentSpanId) \ + HEADER_FUNC(XB3Sampled) \ + HEADER_FUNC(XB3Flags) /** * The following functions are defined for each inline header above. E.g., for ContentLength we diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 52e699aadb358..88c9fae4edb88 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory(common) add_subdirectory(exe) add_subdirectory(server) +add_subdirectory(zipkin) diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 0e68d8f78c630..b978a8998755d 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -453,6 +453,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, connection_manager_.config_.tracingStats()); if (tracing_decision.is_tracing) { + stream_log_debug("XXX Fabio: About to call startSpan", *this); active_span_ = connection_manager_.tracer_.startSpan(*this, *request_headers_, request_info_); } } diff --git a/source/common/http/headers.h b/source/common/http/headers.h index 29564da73cbac..d7e13c8ac5201 100644 --- a/source/common/http/headers.h +++ b/source/common/http/headers.h @@ -54,6 +54,11 @@ class Headers { const LowerCaseString TransferEncoding{"transfer-encoding"}; const LowerCaseString Upgrade{"upgrade"}; const LowerCaseString UserAgent{"user-agent"}; + const LowerCaseString XB3TraceId{"x-b3-traceid"}; + const LowerCaseString XB3SpanId{"x-b3-spanid"}; + const LowerCaseString XB3ParentSpanId{"x-b3-parentspanid"}; + const LowerCaseString XB3Sampled{"x-b3-sampled"}; + const LowerCaseString XB3Flags{"x-b3-flags"}; struct { const std::string Close{"close"}; diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index 6788ea6c75c63..dfa21a307f17e 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -862,15 +862,16 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( "properties" : { "type" : { "type" : "string", - "enum" : ["lightstep"] + "enum" : ["lightstep", "zipkin"] }, "config" : { "type" : "object", "properties" : { "collector_cluster" : {"type" : "string"}, - "access_token_file" : {"type" : "string"} + "access_token_file" : {"type" : "string"}, + "endpoint": {"type": "string"} }, - "required": ["collector_cluster", "access_token_file"], + "required": ["collector_cluster"], "additionalProperties" : false } }, diff --git a/source/common/tracing/http_tracer_impl.cc b/source/common/tracing/http_tracer_impl.cc index 247598ab36240..80ce3a311b3dc 100644 --- a/source/common/tracing/http_tracer_impl.cc +++ b/source/common/tracing/http_tracer_impl.cc @@ -4,6 +4,7 @@ #include "common/common/base64.h" #include "common/common/macros.h" #include "common/common/utility.h" +#include "common/common/enum_to_int.h" #include "common/grpc/common.h" #include "common/http/access_log/access_log_formatter.h" #include "common/http/codes.h" @@ -13,6 +14,9 @@ #include "common/http/utility.h" #include "common/runtime/uuid_util.h" +#include "zipkin/zipkin_core_constants.h" +#include + namespace Tracing { static std::string buildRequestLine(const Http::HeaderMap& request_headers, @@ -320,4 +324,194 @@ void LightStepRecorder::onSuccess(Http::MessagePtr&& msg) { } } +ZipkinSpan::ZipkinSpan(Zipkin::Span& span) : span_(span) {} + +void ZipkinSpan::finishSpan() { span_.finish(); } + +void ZipkinSpan::setTag(const std::string& name, const std::string& value) { + if (this->hasCSAnnotation()) { + span_.setTag(name, value); + } +} + +bool ZipkinSpan::hasCSAnnotation() { + auto annotations = span_.annotations(); + if (annotations.size() > 0) { + return annotations[0].value() == Zipkin::ZipkinCoreConstants::CLIENT_SEND; + } + return false; +} + +ZipkinDriver::TlsZipkinTracer::TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver) + : tracer_(tracer), driver_(driver) {} + +ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + ThreadLocal::Instance& tls, Runtime::Loader& runtime, + const LocalInfo::LocalInfo& local_info) + : cm_(cluster_manager), tls_(tls), runtime_(runtime), local_info_(local_info), + tls_slot_(tls.allocateSlot()) { + + Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster")); + if (!cluster) { + throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", + config.getString("collector_cluster"))); + } + cluster_ = cluster->info(); + + std::string endpoint = config.getString("endpoint"); + + tls_.set( + tls_slot_, + [this, endpoint](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { + Zipkin::Tracer tracer(local_info_.clusterName(), local_info_.address()->asString()); + tracer.setReporter( + ZipkinReporter::NewInstance(std::ref(*this), std::ref(dispatcher), endpoint)); + return ThreadLocal::ThreadLocalObjectSharedPtr{ + new TlsZipkinTracer(std::move(tracer), *this)}; + }); +} + +SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) { + // TODO: start_time is not really used. + // Need to figure out whether or not it is really needed + // A new timestamp is currently generated for a new span + + Zipkin::Tracer& tracer = tls_.getTyped(tls_slot_).tracer_; + ZipkinSpanPtr active_span; + Zipkin::Span new_zipkin_span; + + if (request_headers.OtSpanContext()) { + std::cerr << std::endl + << "Found context" << std::endl; + + // Get the context from the x-b3-envoy header + // This header contains values of annotations previously set + // The context built from this header allows the tracer to properly set the span id and the + // parent id + // The ids carried in x-b3-envoy also appear in the appropriate B3 headers + Zipkin::SpanContext context; + context.populateFromString(request_headers.OtSpanContext()->value().c_str()); + + std::cerr << "Context: " << context.serializeToString() << std::endl; + + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count(), context); + } else { + std::cerr << std::endl + << "No context found. Will create a root span" << std::endl; + + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count()); + } + + // Set the trace-id and span-id headers properly, based on the newly-created span structure + request_headers.insertXB3TraceId().value(new_zipkin_span.traceIdAsHexString()); + request_headers.insertXB3SpanId().value(new_zipkin_span.idAsHexString()); + + // Set the parent-span header properly, based on the newly-created span structure + if (new_zipkin_span.isSet().parent_id) { + request_headers.insertXB3ParentSpanId().value(new_zipkin_span.parentIdAsHexString()); + } + + // Set sampled header + request_headers.insertXB3Sampled().value(std::string(("1"))); + + Zipkin::SpanContext new_span_context(new_zipkin_span); + std::cerr << "New span context: " << new_span_context.serializeToString() << std::endl; + + // Set the ot-span-context with the new context + request_headers.insertOtSpanContext().value(new_span_context.serializeToString()); + + std::cerr << "ZipkinDriver: span's tracer: " << new_zipkin_span.tracer() << std::endl; + + active_span.reset(new ZipkinSpan(new_zipkin_span)); + + return std::move(active_span); +} + +ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& endpoint) + : driver_(driver), endpoint_(endpoint) { + flush_timer_ = dispatcher.createTimer([this]() -> void { + flushSpans(); + enableTimer(); + }); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + span_buffer_.allocateBuffer(min_flush_spans); + + enableTimer(); +} + +std::unique_ptr ZipkinReporter::NewInstance(ZipkinDriver& driver, + Event::Dispatcher& dispatcher, + const std::string& endpoint) { + return std::unique_ptr(new ZipkinReporter(driver, dispatcher, endpoint)); +} + +void ZipkinReporter::reportSpan(Zipkin::Span&& span) { + span_buffer_.addSpan(std::move(span)); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + + std::cerr << "reportSpan() has been called; min_flush: " << min_flush_spans << std::endl; + std::cerr << "reportSpan() span: " << span.toJson() << std::endl; + std::cerr << "reportSpan() pending spans: " << span_buffer_.pendingSpans() << std::endl; + + if (span_buffer_.pendingSpans() == min_flush_spans) { + flushSpans(); + } +} + +void ZipkinReporter::enableTimer() { + uint64_t flush_interval = + driver_.runtime().snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U); + flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); +} + +void ZipkinReporter::flushSpans() { + if (span_buffer_.pendingSpans()) { + std::cerr << "flushSpans() will flush" << std::endl; + std::string request_body = span_buffer_.toStringifiedJsonArray(); + std::cerr << "HTTP request body" << request_body << std::endl; + + std::cerr << "Will post spans to Zipkin. Endpoint: " << endpoint_ << std::endl; + + Http::MessagePtr message(new Http::RequestMessageImpl()); + message->headers().insertMethod().value(Http::Headers::get().MethodValues.Post); + message->headers().insertPath().value(endpoint_); + message->headers().insertHost().value(driver_.cluster()->name()); + message->headers().insertContentType().value(std::string("application/json")); + + Buffer::InstancePtr body(new Buffer::OwnedImpl()); + body->add(request_body); + message->body() = std::move(body); + + uint64_t timeout = + driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U); + driver_.clusterManager() + .httpAsyncClientForCluster(driver_.cluster()->name()) + .send(std::move(message), *this, std::chrono::milliseconds(timeout)); + + span_buffer_.flush(); + } +} + +void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { + std::cerr << "Error posting spans to Zipkin" << std::endl; +} + +void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { + if (Http::Utility::getResponseStatus(http_response->headers()) != + enumToInt(Http::Code::Accepted)) { + std::cerr << "Unexpected HTTP response code: " + << Http::Utility::getResponseStatus(http_response->headers()) << std::endl; + } else { + std::cerr << "Successfully posted spans to Zipkin" << std::endl; + } +} + } // Tracing diff --git a/source/common/tracing/http_tracer_impl.h b/source/common/tracing/http_tracer_impl.h index 879b333a06b00..be85f36a0f4cf 100644 --- a/source/common/tracing/http_tracer_impl.h +++ b/source/common/tracing/http_tracer_impl.h @@ -12,6 +12,9 @@ #include "lightstep/carrier.h" #include "lightstep/tracer.h" +#include "zipkin/tracer.h" +#include "zipkin/span_buffer.h" + namespace Tracing { #define LIGHTSTEP_TRACER_STATS(COUNTER) \ @@ -172,4 +175,71 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac Event::TimerPtr flush_timer_; }; +class ZipkinSpan : public Span { +public: + ZipkinSpan(Zipkin::Span& span); + + void finishSpan() override; + void setTag(const std::string& name, const std::string& value) override; + + bool hasCSAnnotation(); + +private: + Zipkin::Span span_; +}; + +typedef std::unique_ptr ZipkinSpanPtr; + +class ZipkinDriver : public Driver { +public: + ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + ThreadLocal::Instance& tls, Runtime::Loader& runtime, + const LocalInfo::LocalInfo& localinfo); + + SpanPtr startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) override; + + Upstream::ClusterManager& clusterManager() { return cm_; } + Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + Runtime::Loader& runtime() { return runtime_; } + +private: + struct TlsZipkinTracer : ThreadLocal::ThreadLocalObject { + TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver); + + void shutdown() override {} + + Zipkin::Tracer tracer_; + ZipkinDriver& driver_; + }; + + Upstream::ClusterManager& cm_; + Upstream::ClusterInfoConstSharedPtr cluster_; + ThreadLocal::Instance& tls_; + Runtime::Loader& runtime_; + const LocalInfo::LocalInfo& local_info_; + uint32_t tls_slot_; +}; + +class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { +public: + ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, const std::string& endpoint); + + void reportSpan(Zipkin::Span&& span) override; + + void onSuccess(Http::MessagePtr&&) override; + void onFailure(Http::AsyncClient::FailureReason) override; + + static std::unique_ptr + NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, const std::string& endpoint); + +private: + void enableTimer(); + void flushSpans(); + + ZipkinDriver& driver_; + Event::TimerPtr flush_timer_; + Zipkin::SpanBuffer span_buffer_; + std::string endpoint_; +}; } // Tracing diff --git a/source/exe/CMakeLists.txt b/source/exe/CMakeLists.txt index beb63a286339f..907b5bbb4b955 100644 --- a/source/exe/CMakeLists.txt +++ b/source/exe/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(envoy hot_restart.cc main.cc $ $ ${ENVOY_EXE_EXTRA_OBJECTS}) +add_executable(envoy hot_restart.cc main.cc $ $ $ ${ENVOY_EXE_EXTRA_OBJECTS}) if (ENVOY_TCMALLOC) target_link_libraries(envoy tcmalloc_and_profiler) diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index aa06454b1fc05..51c7b82315190 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -117,6 +117,18 @@ void MainImpl::initializeTracers(const Json::Object& tracing_configuration) { http_tracer_.reset( new Tracing::HttpTracerImpl(std::move(lightstep_driver), server_.localInfo())); + } else if (type == "zipkin") { + if (server_.localInfo().clusterName().empty()) { + throw EnvoyException("cluster name must be defined if Zipkin tracing is enabled. See " + "--service-cluster option."); + } + + Tracing::DriverPtr zipkin_driver( + new Tracing::ZipkinDriver(*driver->getObject("config"), *cluster_manager_, + server_.threadLocal(), server_.runtime(), server_.localInfo())); + + http_tracer_.reset( + new Tracing::HttpTracerImpl(std::move(zipkin_driver), server_.localInfo())); } else { throw EnvoyException(fmt::format("unsupported driver type: '{}'", type)); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 51f271855b2f0..b68d65bf6be14 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -26,6 +26,7 @@ include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR}) add_executable(envoy-test $ $ + $ ${ENVOY_TEST_EXTRA_OBJECTS} common/access_log/access_log_manager_impl_test.cc common/api/api_impl_test.cc From af1b813105486270fe76ba5b03998ef9e5595929 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Fri, 7 Apr 2017 13:49:58 -0400 Subject: [PATCH 2/7] removing printfs and cleanup json schema --- source/common/http/conn_manager_impl.cc | 1 - source/common/json/config_schemas.cc | 37 +++++++++-- source/common/tracing/BUILD | 1 + source/common/tracing/http_tracer_impl.cc | 76 ++++++++--------------- source/common/tracing/http_tracer_impl.h | 10 +-- 5 files changed, 65 insertions(+), 60 deletions(-) diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index b978a8998755d..0e68d8f78c630 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -453,7 +453,6 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, connection_manager_.config_.tracingStats()); if (tracing_decision.is_tracing) { - stream_log_debug("XXX Fabio: About to call startSpan", *this); active_span_ = connection_manager_.tracer_.startSpan(*this, *request_headers_, request_info_); } } diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index dfa21a307f17e..8b0012e8bdb9a 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -857,21 +857,40 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( { "$schema": "http://json-schema.org/schema#", "definitions" : { - "driver" : { + "lightstep_driver" : { "type" : "object", "properties" : { "type" : { "type" : "string", - "enum" : ["lightstep", "zipkin"] + "enum" : ["lightstep"] }, "config" : { "type" : "object", "properties" : { "collector_cluster" : {"type" : "string"}, - "access_token_file" : {"type" : "string"}, - "endpoint": {"type": "string"} + "access_token_file" : {"type" : "string"} }, - "required": ["collector_cluster"], + "required": ["collector_cluster", "access_token_file"], + "additionalProperties" : false + } + }, + "required" : ["type", "config"], + "additionalProperties" : false + }, + "zipkin_driver" : { + "type" : "object", + "properties" : { + "type" : { + "type" : "string", + "enum" : ["zipkin"] + }, + "config" : { + "type" : "object", + "properties" : { + "collector_cluster" : {"type" : "string"}, + "collector_endpoint": {"type": "string"} + }, + "required": ["collector_cluster", "collector_endpoint"], "additionalProperties" : false } }, @@ -925,7 +944,13 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( "http": { "type" : "object", "properties" : { - "driver" : {"$ref" : "#/definitions/driver"} + "driver" : { + "type" : "object", + "oneOf" : [ + {"$ref" : "#/definitions/lightstep_driver"}, + {"$ref" : "#/definitions/zipkin_driver"} + ] + } }, "additionalProperties" : false } diff --git a/source/common/tracing/BUILD b/source/common/tracing/BUILD index fbe1499c2652b..327a18bc74d12 100644 --- a/source/common/tracing/BUILD +++ b/source/common/tracing/BUILD @@ -25,5 +25,6 @@ envoy_cc_library( "//source/common/http/access_log:access_log_formatter_lib", "//source/common/json:json_loader_lib", "//source/common/runtime:uuid_util_lib", + "//source/zipkin:zipkin_lib", ], ) diff --git a/source/common/tracing/http_tracer_impl.cc b/source/common/tracing/http_tracer_impl.cc index 80ce3a311b3dc..acecc1483c338 100644 --- a/source/common/tracing/http_tracer_impl.cc +++ b/source/common/tracing/http_tracer_impl.cc @@ -358,17 +358,17 @@ ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& } cluster_ = cluster->info(); - std::string endpoint = config.getString("endpoint"); - - tls_.set( - tls_slot_, - [this, endpoint](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { - Zipkin::Tracer tracer(local_info_.clusterName(), local_info_.address()->asString()); - tracer.setReporter( - ZipkinReporter::NewInstance(std::ref(*this), std::ref(dispatcher), endpoint)); - return ThreadLocal::ThreadLocalObjectSharedPtr{ - new TlsZipkinTracer(std::move(tracer), *this)}; - }); + std::string collector_endpoint = config.getString("collector_endpoint"); + + tls_.set(tls_slot_, [this, collector_endpoint](Event::Dispatcher& dispatcher) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + Zipkin::Tracer tracer(local_info_.clusterName(), + local_info_.address()->asString()); + tracer.setReporter(ZipkinReporter::NewInstance( + std::ref(*this), std::ref(dispatcher), collector_endpoint)); + return ThreadLocal::ThreadLocalObjectSharedPtr{ + new TlsZipkinTracer(std::move(tracer), *this)}; + }); } SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::string&, @@ -382,25 +382,16 @@ SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::str Zipkin::Span new_zipkin_span; if (request_headers.OtSpanContext()) { - std::cerr << std::endl - << "Found context" << std::endl; - - // Get the context from the x-b3-envoy header - // This header contains values of annotations previously set - // The context built from this header allows the tracer to properly set the span id and the - // parent id - // The ids carried in x-b3-envoy also appear in the appropriate B3 headers + // Get the open tracing span context. + // This header contains B3 annotations set by the downstream caller. + // The context built from this header allows the zipkin tracer to + // properly set the span id and the parent span id. Zipkin::SpanContext context; - context.populateFromString(request_headers.OtSpanContext()->value().c_str()); - - std::cerr << "Context: " << context.serializeToString() << std::endl; + context.populateFromString(request_headers.OtSpanContext()->value().c_str()); new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), start_time.time_since_epoch().count(), context); } else { - std::cerr << std::endl - << "No context found. Will create a root span" << std::endl; - new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), start_time.time_since_epoch().count()); } @@ -418,21 +409,17 @@ SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::str request_headers.insertXB3Sampled().value(std::string(("1"))); Zipkin::SpanContext new_span_context(new_zipkin_span); - std::cerr << "New span context: " << new_span_context.serializeToString() << std::endl; // Set the ot-span-context with the new context request_headers.insertOtSpanContext().value(new_span_context.serializeToString()); - - std::cerr << "ZipkinDriver: span's tracer: " << new_zipkin_span.tracer() << std::endl; - active_span.reset(new ZipkinSpan(new_zipkin_span)); return std::move(active_span); } ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, - const std::string& endpoint) - : driver_(driver), endpoint_(endpoint) { + const std::string& collector_endpoint) + : driver_(driver), collector_endpoint_(collector_endpoint) { flush_timer_ = dispatcher.createTimer([this]() -> void { flushSpans(); enableTimer(); @@ -445,10 +432,11 @@ ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatch enableTimer(); } -std::unique_ptr ZipkinReporter::NewInstance(ZipkinDriver& driver, - Event::Dispatcher& dispatcher, - const std::string& endpoint) { - return std::unique_ptr(new ZipkinReporter(driver, dispatcher, endpoint)); +std::unique_ptr +ZipkinReporter::NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint) { + return std::unique_ptr( + new ZipkinReporter(driver, dispatcher, collector_endpoint)); } void ZipkinReporter::reportSpan(Zipkin::Span&& span) { @@ -457,10 +445,6 @@ void ZipkinReporter::reportSpan(Zipkin::Span&& span) { uint64_t min_flush_spans = driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); - std::cerr << "reportSpan() has been called; min_flush: " << min_flush_spans << std::endl; - std::cerr << "reportSpan() span: " << span.toJson() << std::endl; - std::cerr << "reportSpan() pending spans: " << span_buffer_.pendingSpans() << std::endl; - if (span_buffer_.pendingSpans() == min_flush_spans) { flushSpans(); } @@ -474,15 +458,10 @@ void ZipkinReporter::enableTimer() { void ZipkinReporter::flushSpans() { if (span_buffer_.pendingSpans()) { - std::cerr << "flushSpans() will flush" << std::endl; std::string request_body = span_buffer_.toStringifiedJsonArray(); - std::cerr << "HTTP request body" << request_body << std::endl; - - std::cerr << "Will post spans to Zipkin. Endpoint: " << endpoint_ << std::endl; - Http::MessagePtr message(new Http::RequestMessageImpl()); message->headers().insertMethod().value(Http::Headers::get().MethodValues.Post); - message->headers().insertPath().value(endpoint_); + message->headers().insertPath().value(collector_endpoint_); message->headers().insertHost().value(driver_.cluster()->name()); message->headers().insertContentType().value(std::string("application/json")); @@ -501,16 +480,15 @@ void ZipkinReporter::flushSpans() { } void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { - std::cerr << "Error posting spans to Zipkin" << std::endl; + // TODO: stats } void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { if (Http::Utility::getResponseStatus(http_response->headers()) != enumToInt(Http::Code::Accepted)) { - std::cerr << "Unexpected HTTP response code: " - << Http::Utility::getResponseStatus(http_response->headers()) << std::endl; + // TODO: stats } else { - std::cerr << "Successfully posted spans to Zipkin" << std::endl; + // TODO: stats } } diff --git a/source/common/tracing/http_tracer_impl.h b/source/common/tracing/http_tracer_impl.h index be85f36a0f4cf..ba81a10ebfa50 100644 --- a/source/common/tracing/http_tracer_impl.h +++ b/source/common/tracing/http_tracer_impl.h @@ -223,15 +223,17 @@ class ZipkinDriver : public Driver { class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { public: - ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, const std::string& endpoint); + ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); void reportSpan(Zipkin::Span&& span) override; void onSuccess(Http::MessagePtr&&) override; void onFailure(Http::AsyncClient::FailureReason) override; - static std::unique_ptr - NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, const std::string& endpoint); + static std::unique_ptr NewInstance(ZipkinDriver& driver, + Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); private: void enableTimer(); @@ -240,6 +242,6 @@ class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { ZipkinDriver& driver_; Event::TimerPtr flush_timer_; Zipkin::SpanBuffer span_buffer_; - std::string endpoint_; + std::string collector_endpoint_; }; } // Tracing From 474392af5c1705e2fadfcc486bb24e12c6999a64 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 10 Apr 2017 11:33:26 -0400 Subject: [PATCH 3/7] refactor zipkin tracer code into separate files --- source/common/CMakeLists.txt | 1 + source/common/tracing/BUILD | 12 +- source/common/tracing/http_tracer_impl.cc | 172 ------------------ source/common/tracing/http_tracer_impl.h | 72 -------- source/common/tracing/zipkin_tracer_impl.cc | 187 ++++++++++++++++++++ source/common/tracing/zipkin_tracer_impl.h | 97 ++++++++++ 6 files changed, 295 insertions(+), 246 deletions(-) create mode 100644 source/common/tracing/zipkin_tracer_impl.cc create mode 100644 source/common/tracing/zipkin_tracer_impl.h diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index ce9ea91c2e291..5850e6b96ded8 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -110,6 +110,7 @@ add_library( stats/thread_local_store.cc thread_local/thread_local_impl.cc tracing/http_tracer_impl.cc + tracing/zipkin_tracer_impl.cc upstream/cds_api_impl.cc upstream/cluster_manager_impl.cc upstream/health_checker_impl.cc diff --git a/source/common/tracing/BUILD b/source/common/tracing/BUILD index 327a18bc74d12..5ac2a2773c62b 100644 --- a/source/common/tracing/BUILD +++ b/source/common/tracing/BUILD @@ -4,8 +4,16 @@ load("//bazel:envoy_build_system.bzl", "envoy_cc_library") envoy_cc_library( name = "http_tracer_lib", - srcs = ["http_tracer_impl.cc"], - hdrs = ["http_tracer_impl.h"], + srcs = [ + "http_tracer_impl.cc", + "lightstep_tracer_impl.cc", + "zipkin_tracer_impl.cc", + ], + hdrs = [ + "http_tracer_impl.h", + "lightstep_tracer_impl.h", + "zipkin_tracer_impl.h", + ], external_deps = ["lightstep"], deps = [ "//include/envoy/local_info:local_info_interface", diff --git a/source/common/tracing/http_tracer_impl.cc b/source/common/tracing/http_tracer_impl.cc index acecc1483c338..247598ab36240 100644 --- a/source/common/tracing/http_tracer_impl.cc +++ b/source/common/tracing/http_tracer_impl.cc @@ -4,7 +4,6 @@ #include "common/common/base64.h" #include "common/common/macros.h" #include "common/common/utility.h" -#include "common/common/enum_to_int.h" #include "common/grpc/common.h" #include "common/http/access_log/access_log_formatter.h" #include "common/http/codes.h" @@ -14,9 +13,6 @@ #include "common/http/utility.h" #include "common/runtime/uuid_util.h" -#include "zipkin/zipkin_core_constants.h" -#include - namespace Tracing { static std::string buildRequestLine(const Http::HeaderMap& request_headers, @@ -324,172 +320,4 @@ void LightStepRecorder::onSuccess(Http::MessagePtr&& msg) { } } -ZipkinSpan::ZipkinSpan(Zipkin::Span& span) : span_(span) {} - -void ZipkinSpan::finishSpan() { span_.finish(); } - -void ZipkinSpan::setTag(const std::string& name, const std::string& value) { - if (this->hasCSAnnotation()) { - span_.setTag(name, value); - } -} - -bool ZipkinSpan::hasCSAnnotation() { - auto annotations = span_.annotations(); - if (annotations.size() > 0) { - return annotations[0].value() == Zipkin::ZipkinCoreConstants::CLIENT_SEND; - } - return false; -} - -ZipkinDriver::TlsZipkinTracer::TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver) - : tracer_(tracer), driver_(driver) {} - -ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, - ThreadLocal::Instance& tls, Runtime::Loader& runtime, - const LocalInfo::LocalInfo& local_info) - : cm_(cluster_manager), tls_(tls), runtime_(runtime), local_info_(local_info), - tls_slot_(tls.allocateSlot()) { - - Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster")); - if (!cluster) { - throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", - config.getString("collector_cluster"))); - } - cluster_ = cluster->info(); - - std::string collector_endpoint = config.getString("collector_endpoint"); - - tls_.set(tls_slot_, [this, collector_endpoint](Event::Dispatcher& dispatcher) - -> ThreadLocal::ThreadLocalObjectSharedPtr { - Zipkin::Tracer tracer(local_info_.clusterName(), - local_info_.address()->asString()); - tracer.setReporter(ZipkinReporter::NewInstance( - std::ref(*this), std::ref(dispatcher), collector_endpoint)); - return ThreadLocal::ThreadLocalObjectSharedPtr{ - new TlsZipkinTracer(std::move(tracer), *this)}; - }); -} - -SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::string&, - SystemTime start_time) { - // TODO: start_time is not really used. - // Need to figure out whether or not it is really needed - // A new timestamp is currently generated for a new span - - Zipkin::Tracer& tracer = tls_.getTyped(tls_slot_).tracer_; - ZipkinSpanPtr active_span; - Zipkin::Span new_zipkin_span; - - if (request_headers.OtSpanContext()) { - // Get the open tracing span context. - // This header contains B3 annotations set by the downstream caller. - // The context built from this header allows the zipkin tracer to - // properly set the span id and the parent span id. - Zipkin::SpanContext context; - - context.populateFromString(request_headers.OtSpanContext()->value().c_str()); - new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), - start_time.time_since_epoch().count(), context); - } else { - new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), - start_time.time_since_epoch().count()); - } - - // Set the trace-id and span-id headers properly, based on the newly-created span structure - request_headers.insertXB3TraceId().value(new_zipkin_span.traceIdAsHexString()); - request_headers.insertXB3SpanId().value(new_zipkin_span.idAsHexString()); - - // Set the parent-span header properly, based on the newly-created span structure - if (new_zipkin_span.isSet().parent_id) { - request_headers.insertXB3ParentSpanId().value(new_zipkin_span.parentIdAsHexString()); - } - - // Set sampled header - request_headers.insertXB3Sampled().value(std::string(("1"))); - - Zipkin::SpanContext new_span_context(new_zipkin_span); - - // Set the ot-span-context with the new context - request_headers.insertOtSpanContext().value(new_span_context.serializeToString()); - active_span.reset(new ZipkinSpan(new_zipkin_span)); - - return std::move(active_span); -} - -ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, - const std::string& collector_endpoint) - : driver_(driver), collector_endpoint_(collector_endpoint) { - flush_timer_ = dispatcher.createTimer([this]() -> void { - flushSpans(); - enableTimer(); - }); - - uint64_t min_flush_spans = - driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); - span_buffer_.allocateBuffer(min_flush_spans); - - enableTimer(); -} - -std::unique_ptr -ZipkinReporter::NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, - const std::string& collector_endpoint) { - return std::unique_ptr( - new ZipkinReporter(driver, dispatcher, collector_endpoint)); -} - -void ZipkinReporter::reportSpan(Zipkin::Span&& span) { - span_buffer_.addSpan(std::move(span)); - - uint64_t min_flush_spans = - driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); - - if (span_buffer_.pendingSpans() == min_flush_spans) { - flushSpans(); - } -} - -void ZipkinReporter::enableTimer() { - uint64_t flush_interval = - driver_.runtime().snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U); - flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); -} - -void ZipkinReporter::flushSpans() { - if (span_buffer_.pendingSpans()) { - std::string request_body = span_buffer_.toStringifiedJsonArray(); - Http::MessagePtr message(new Http::RequestMessageImpl()); - message->headers().insertMethod().value(Http::Headers::get().MethodValues.Post); - message->headers().insertPath().value(collector_endpoint_); - message->headers().insertHost().value(driver_.cluster()->name()); - message->headers().insertContentType().value(std::string("application/json")); - - Buffer::InstancePtr body(new Buffer::OwnedImpl()); - body->add(request_body); - message->body() = std::move(body); - - uint64_t timeout = - driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U); - driver_.clusterManager() - .httpAsyncClientForCluster(driver_.cluster()->name()) - .send(std::move(message), *this, std::chrono::milliseconds(timeout)); - - span_buffer_.flush(); - } -} - -void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { - // TODO: stats -} - -void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { - if (Http::Utility::getResponseStatus(http_response->headers()) != - enumToInt(Http::Code::Accepted)) { - // TODO: stats - } else { - // TODO: stats - } -} - } // Tracing diff --git a/source/common/tracing/http_tracer_impl.h b/source/common/tracing/http_tracer_impl.h index ba81a10ebfa50..879b333a06b00 100644 --- a/source/common/tracing/http_tracer_impl.h +++ b/source/common/tracing/http_tracer_impl.h @@ -12,9 +12,6 @@ #include "lightstep/carrier.h" #include "lightstep/tracer.h" -#include "zipkin/tracer.h" -#include "zipkin/span_buffer.h" - namespace Tracing { #define LIGHTSTEP_TRACER_STATS(COUNTER) \ @@ -175,73 +172,4 @@ class LightStepRecorder : public lightstep::Recorder, Http::AsyncClient::Callbac Event::TimerPtr flush_timer_; }; -class ZipkinSpan : public Span { -public: - ZipkinSpan(Zipkin::Span& span); - - void finishSpan() override; - void setTag(const std::string& name, const std::string& value) override; - - bool hasCSAnnotation(); - -private: - Zipkin::Span span_; -}; - -typedef std::unique_ptr ZipkinSpanPtr; - -class ZipkinDriver : public Driver { -public: - ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, - ThreadLocal::Instance& tls, Runtime::Loader& runtime, - const LocalInfo::LocalInfo& localinfo); - - SpanPtr startSpan(Http::HeaderMap& request_headers, const std::string&, - SystemTime start_time) override; - - Upstream::ClusterManager& clusterManager() { return cm_; } - Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } - Runtime::Loader& runtime() { return runtime_; } - -private: - struct TlsZipkinTracer : ThreadLocal::ThreadLocalObject { - TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver); - - void shutdown() override {} - - Zipkin::Tracer tracer_; - ZipkinDriver& driver_; - }; - - Upstream::ClusterManager& cm_; - Upstream::ClusterInfoConstSharedPtr cluster_; - ThreadLocal::Instance& tls_; - Runtime::Loader& runtime_; - const LocalInfo::LocalInfo& local_info_; - uint32_t tls_slot_; -}; - -class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { -public: - ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, - const std::string& collector_endpoint); - - void reportSpan(Zipkin::Span&& span) override; - - void onSuccess(Http::MessagePtr&&) override; - void onFailure(Http::AsyncClient::FailureReason) override; - - static std::unique_ptr NewInstance(ZipkinDriver& driver, - Event::Dispatcher& dispatcher, - const std::string& collector_endpoint); - -private: - void enableTimer(); - void flushSpans(); - - ZipkinDriver& driver_; - Event::TimerPtr flush_timer_; - Zipkin::SpanBuffer span_buffer_; - std::string collector_endpoint_; -}; } // Tracing diff --git a/source/common/tracing/zipkin_tracer_impl.cc b/source/common/tracing/zipkin_tracer_impl.cc new file mode 100644 index 0000000000000..bdac3559d5be4 --- /dev/null +++ b/source/common/tracing/zipkin_tracer_impl.cc @@ -0,0 +1,187 @@ +#include "common/tracing/http_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" + +#include "common/common/enum_to_int.h" +#include "common/http/codes.h" +#include "common/http/headers.h" +#include "common/http/header_map_impl.h" +#include "common/http/message_impl.h" +#include "common/http/utility.h" + +#include "zipkin/zipkin_core_constants.h" + +namespace Tracing { + +ZipkinSpan::ZipkinSpan(Zipkin::Span& span) : span_(span) {} + +void ZipkinSpan::finishSpan() { span_.finish(); } + +void ZipkinSpan::setTag(const std::string& name, const std::string& value) { + if (this->hasCSAnnotation()) { + span_.setTag(name, value); + } +} + +bool ZipkinSpan::hasCSAnnotation() { + auto annotations = span_.annotations(); + if (annotations.size() > 0) { + return annotations[0].value() == Zipkin::ZipkinCoreConstants::CLIENT_SEND; + } + return false; +} + +ZipkinDriver::TlsZipkinTracer::TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver) + : tracer_(tracer), driver_(driver) {} + +ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + Stats::Store& stats, ThreadLocal::Instance& tls, + Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info) + : cm_(cluster_manager), + tracer_stats_{ZIPKIN_TRACER_STATS(POOL_COUNTER_PREFIX(stats, "tracing.zipkin."))}, tls_(tls), + runtime_(runtime), local_info_(local_info), tls_slot_(tls.allocateSlot()) { + + Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster")); + if (!cluster) { + throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", + config.getString("collector_cluster"))); + } + cluster_ = cluster->info(); + + std::string collector_endpoint = config.getString("collector_endpoint"); + + tls_.set(tls_slot_, [this, collector_endpoint](Event::Dispatcher& dispatcher) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + Zipkin::Tracer tracer(local_info_.clusterName(), + local_info_.address()->asString()); + tracer.setReporter(ZipkinReporter::NewInstance( + std::ref(*this), std::ref(dispatcher), collector_endpoint)); + return ThreadLocal::ThreadLocalObjectSharedPtr{ + new TlsZipkinTracer(std::move(tracer), *this)}; + }); +} + +SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) { + // TODO: start_time is not really used. + // Need to figure out whether or not it is really needed + // A new timestamp is currently generated for a new span + + Zipkin::Tracer& tracer = tls_.getTyped(tls_slot_).tracer_; + ZipkinSpanPtr active_span; + Zipkin::Span new_zipkin_span; + + if (request_headers.OtSpanContext()) { + // Get the open tracing span context. + // This header contains B3 annotations set by the downstream caller. + // The context built from this header allows the zipkin tracer to + // properly set the span id and the parent span id. + Zipkin::SpanContext context; + + context.populateFromString(request_headers.OtSpanContext()->value().c_str()); + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count(), context); + } else { + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count()); + } + + // Set the trace-id and span-id headers properly, based on the newly-created span structure + request_headers.insertXB3TraceId().value(new_zipkin_span.traceIdAsHexString()); + request_headers.insertXB3SpanId().value(new_zipkin_span.idAsHexString()); + + // Set the parent-span header properly, based on the newly-created span structure + if (new_zipkin_span.isSet().parent_id) { + request_headers.insertXB3ParentSpanId().value(new_zipkin_span.parentIdAsHexString()); + } + + // Set sampled header + request_headers.insertXB3Sampled().value(std::string(("1"))); + + Zipkin::SpanContext new_span_context(new_zipkin_span); + + // Set the ot-span-context with the new context + request_headers.insertOtSpanContext().value(new_span_context.serializeToString()); + active_span.reset(new ZipkinSpan(new_zipkin_span)); + + return std::move(active_span); +} + +ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint) + : driver_(driver), collector_endpoint_(collector_endpoint) { + flush_timer_ = dispatcher.createTimer([this]() -> void { + driver_.tracerStats().timer_flushed_.inc(); + flushSpans(); + enableTimer(); + }); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + span_buffer_.allocateBuffer(min_flush_spans); + + enableTimer(); +} + +std::unique_ptr +ZipkinReporter::NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint) { + return std::unique_ptr( + new ZipkinReporter(driver, dispatcher, collector_endpoint)); +} + +void ZipkinReporter::reportSpan(Zipkin::Span&& span) { + span_buffer_.addSpan(std::move(span)); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + + if (span_buffer_.pendingSpans() == min_flush_spans) { + flushSpans(); + } +} + +void ZipkinReporter::enableTimer() { + uint64_t flush_interval = + driver_.runtime().snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U); + flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); +} + +void ZipkinReporter::flushSpans() { + if (span_buffer_.pendingSpans()) { + driver_.tracerStats().spans_sent_.add(span_buffer_.pendingSpans()); + + std::string request_body = span_buffer_.toStringifiedJsonArray(); + Http::MessagePtr message(new Http::RequestMessageImpl()); + message->headers().insertMethod().value(Http::Headers::get().MethodValues.Post); + message->headers().insertPath().value(collector_endpoint_); + message->headers().insertHost().value(driver_.cluster()->name()); + message->headers().insertContentType().value(std::string("application/json")); + + Buffer::InstancePtr body(new Buffer::OwnedImpl()); + body->add(request_body); + message->body() = std::move(body); + + uint64_t timeout = + driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U); + driver_.clusterManager() + .httpAsyncClientForCluster(driver_.cluster()->name()) + .send(std::move(message), *this, std::chrono::milliseconds(timeout)); + + span_buffer_.flush(); + } +} + +void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { + driver_.tracerStats().reports_dropped_.inc(); +} + +void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { + if (Http::Utility::getResponseStatus(http_response->headers()) != + enumToInt(Http::Code::Accepted)) { + driver_.tracerStats().reports_sent_.inc(); + } else { + driver_.tracerStats().reports_dropped_.inc(); + } +} + +} // Tracing diff --git a/source/common/tracing/zipkin_tracer_impl.h b/source/common/tracing/zipkin_tracer_impl.h new file mode 100644 index 0000000000000..a693327999d6e --- /dev/null +++ b/source/common/tracing/zipkin_tracer_impl.h @@ -0,0 +1,97 @@ +#pragma once + +#include "envoy/runtime/runtime.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/tracing/http_tracer.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/http/header_map_impl.h" +#include "common/json/json_loader.h" + +#include "zipkin/tracer.h" +#include "zipkin/span_buffer.h" + +namespace Tracing { + +#define ZIPKIN_TRACER_STATS(COUNTER) \ + COUNTER(spans_sent) \ + COUNTER(timer_flushed) \ + COUNTER(reports_sent) \ + COUNTER(reports_dropped) + +struct ZipkinTracerStats { + ZIPKIN_TRACER_STATS(GENERATE_COUNTER_STRUCT) +}; + +class ZipkinSpan : public Span { +public: + ZipkinSpan(Zipkin::Span& span); + + void finishSpan() override; + void setTag(const std::string& name, const std::string& value) override; + + bool hasCSAnnotation(); + +private: + Zipkin::Span span_; +}; + +typedef std::unique_ptr ZipkinSpanPtr; + +class ZipkinDriver : public Driver { +public: + ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime, + const LocalInfo::LocalInfo& localinfo); + + SpanPtr startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) override; + + Upstream::ClusterManager& clusterManager() { return cm_; } + Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + Runtime::Loader& runtime() { return runtime_; } + ZipkinTracerStats& tracerStats() { return tracer_stats_; } + +private: + struct TlsZipkinTracer : ThreadLocal::ThreadLocalObject { + TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver); + + void shutdown() override {} + + Zipkin::Tracer tracer_; + ZipkinDriver& driver_; + }; + + Upstream::ClusterManager& cm_; + Upstream::ClusterInfoConstSharedPtr cluster_; + ZipkinTracerStats tracer_stats_; + ThreadLocal::Instance& tls_; + Runtime::Loader& runtime_; + const LocalInfo::LocalInfo& local_info_; + uint32_t tls_slot_; +}; + +class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { +public: + ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); + + void reportSpan(Zipkin::Span&& span) override; + + void onSuccess(Http::MessagePtr&&) override; + void onFailure(Http::AsyncClient::FailureReason) override; + + static std::unique_ptr NewInstance(ZipkinDriver& driver, + Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); + +private: + void enableTimer(); + void flushSpans(); + + ZipkinDriver& driver_; + Event::TimerPtr flush_timer_; + Zipkin::SpanBuffer span_buffer_; + std::string collector_endpoint_; +}; +} // Tracing From 30d0ac16b4b00ca02ca6fa550fa014199290c07a Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 10 Apr 2017 12:29:38 -0400 Subject: [PATCH 4/7] build fixes --- source/common/tracing/BUILD | 8 +++----- source/server/configuration_impl.cc | 7 ++++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/common/tracing/BUILD b/source/common/tracing/BUILD index 5ac2a2773c62b..357142b55e6f6 100644 --- a/source/common/tracing/BUILD +++ b/source/common/tracing/BUILD @@ -5,14 +5,12 @@ load("//bazel:envoy_build_system.bzl", "envoy_cc_library") envoy_cc_library( name = "http_tracer_lib", srcs = [ - "http_tracer_impl.cc", - "lightstep_tracer_impl.cc", - "zipkin_tracer_impl.cc", + "http_tracer_impl.cc", + "zipkin_tracer_impl.cc", ], hdrs = [ "http_tracer_impl.h", - "lightstep_tracer_impl.h", - "zipkin_tracer_impl.h", + "zipkin_tracer_impl.h", ], external_deps = ["lightstep"], deps = [ diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index 51c7b82315190..5f4740c5446d1 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -10,6 +10,7 @@ #include "common/ratelimit/ratelimit_impl.h" #include "common/ssl/context_config_impl.h" #include "common/tracing/http_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" #include "common/upstream/cluster_manager_impl.h" namespace Server { @@ -123,9 +124,9 @@ void MainImpl::initializeTracers(const Json::Object& tracing_configuration) { "--service-cluster option."); } - Tracing::DriverPtr zipkin_driver( - new Tracing::ZipkinDriver(*driver->getObject("config"), *cluster_manager_, - server_.threadLocal(), server_.runtime(), server_.localInfo())); + Tracing::DriverPtr zipkin_driver(new Tracing::ZipkinDriver( + *driver->getObject("config"), *cluster_manager_, server_.stats(), server_.threadLocal(), + server_.runtime(), server_.localInfo())); http_tracer_.reset( new Tracing::HttpTracerImpl(std::move(zipkin_driver), server_.localInfo())); From 19bb006f3c0e452d9058c13b7ec671b88d16b799 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 10 Apr 2017 17:03:47 -0400 Subject: [PATCH 5/7] disallow http2 collector cluster for zipkin --- source/common/tracing/zipkin_tracer_impl.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/common/tracing/zipkin_tracer_impl.cc b/source/common/tracing/zipkin_tracer_impl.cc index bdac3559d5be4..bf7678354ddd8 100644 --- a/source/common/tracing/zipkin_tracer_impl.cc +++ b/source/common/tracing/zipkin_tracer_impl.cc @@ -47,6 +47,12 @@ ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& } cluster_ = cluster->info(); + if (cluster_->features() & Upstream::ClusterInfo::Features::HTTP2) { + throw EnvoyException( + fmt::format("Zipkin collector service (cluster {}) can be accessed over http1.1 only", + cluster_->name())); + } + std::string collector_endpoint = config.getString("collector_endpoint"); tls_.set(tls_slot_, [this, collector_endpoint](Event::Dispatcher& dispatcher) From a2d26cd39260eb4754174f0734fcf0f41cf9fb89 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 10 Apr 2017 18:02:51 -0400 Subject: [PATCH 6/7] fix buggy stats recording --- source/common/tracing/zipkin_tracer_impl.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/common/tracing/zipkin_tracer_impl.cc b/source/common/tracing/zipkin_tracer_impl.cc index bf7678354ddd8..1a97c6f0fb34d 100644 --- a/source/common/tracing/zipkin_tracer_impl.cc +++ b/source/common/tracing/zipkin_tracer_impl.cc @@ -76,6 +76,8 @@ SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::str ZipkinSpanPtr active_span; Zipkin::Span new_zipkin_span; + // TODO (fabolive): What happens if Host header is null ? + if (request_headers.OtSpanContext()) { // Get the open tracing span context. // This header contains B3 annotations set by the downstream caller. @@ -184,9 +186,9 @@ void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { if (Http::Utility::getResponseStatus(http_response->headers()) != enumToInt(Http::Code::Accepted)) { - driver_.tracerStats().reports_sent_.inc(); - } else { driver_.tracerStats().reports_dropped_.inc(); + } else { + driver_.tracerStats().reports_sent_.inc(); } } From 8946e7c59b334d6cda5d62919579d2fe13b274ce Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 10 Apr 2017 18:03:53 -0400 Subject: [PATCH 7/7] Zipkin tracer implementation - tests: first pass --- test/CMakeLists.txt | 1 + test/common/tracing/BUILD | 1 + test/common/tracing/http_tracer_impl_test.cc | 1 - .../common/tracing/zipkin_tracer_impl_test.cc | 273 ++++++++++++++++++ 4 files changed, 275 insertions(+), 1 deletion(-) create mode 100644 test/common/tracing/zipkin_tracer_impl_test.cc diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 2de5397b915b1..54d4c325b494b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -101,6 +101,7 @@ add_executable(envoy-test common/stats/thread_local_store_test.cc common/tracing/http_tracer_impl_test.cc common/tracing/lightstep_tracer_impl_test.cc + common/tracing/zipkin_tracer_impl_test.cc common/upstream/cds_api_impl_test.cc common/upstream/cluster_manager_impl_test.cc common/upstream/health_checker_impl_test.cc diff --git a/test/common/tracing/BUILD b/test/common/tracing/BUILD index dc3b2ad97093c..e798257eff611 100644 --- a/test/common/tracing/BUILD +++ b/test/common/tracing/BUILD @@ -7,6 +7,7 @@ envoy_cc_test( srcs = [ "http_tracer_impl_test.cc", "lightstep_tracer_impl_test.cc", + "zipkin_tracer_impl_test.cc", ], deps = [ "//source/common/common:base64_lib", diff --git a/test/common/tracing/http_tracer_impl_test.cc b/test/common/tracing/http_tracer_impl_test.cc index df18f9f524e62..cda5dc83350fa 100644 --- a/test/common/tracing/http_tracer_impl_test.cc +++ b/test/common/tracing/http_tracer_impl_test.cc @@ -5,7 +5,6 @@ #include "common/runtime/runtime_impl.h" #include "common/runtime/uuid_util.h" #include "common/tracing/http_tracer_impl.h" -#include "common/tracing/lightstep_tracer_impl.h" #include "test/mocks/http/mocks.h" #include "test/mocks/local_info/mocks.h" diff --git a/test/common/tracing/zipkin_tracer_impl_test.cc b/test/common/tracing/zipkin_tracer_impl_test.cc new file mode 100644 index 0000000000000..a37c38ada7a89 --- /dev/null +++ b/test/common/tracing/zipkin_tracer_impl_test.cc @@ -0,0 +1,273 @@ +#include "common/http/headers.h" +#include "common/http/header_map_impl.h" +#include "common/http/message_impl.h" +#include "common/runtime/runtime_impl.h" +#include "common/runtime/uuid_util.h" +#include "common/tracing/http_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" + +#include "test/mocks/http/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/test_common/utility.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::Test; + +namespace Tracing { + +class ZipkinDriverTest : public Test { +public: + void setup(Json::Object& config, bool init_timer) { + ON_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillByDefault(ReturnRef(cm_.async_client_)); + + if (init_timer) { + timer_ = new NiceMock(&tls_.dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(5000))); + } + + driver_.reset(new ZipkinDriver(config, cm_, stats_, tls_, runtime_, local_info_)); + } + + void setupValidDriver() { + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(0)); // No HTTP2 for zipkin upstreams + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + setup(*loader, true); + } + + const std::string operation_name_{"test"}; + Http::TestHeaderMapImpl request_headers_{ + {":authority", "api.lyft.com"}, {":path", "/"}, {":method", "GET"}, {"x-request-id", "foo"}}; + SystemTime start_time_; + Http::AccessLog::MockRequestInfo request_info_; + + std::unique_ptr driver_; + NiceMock* timer_; + Stats::IsolatedStoreImpl stats_; + NiceMock cm_; + NiceMock runtime_; + NiceMock tls_; + NiceMock local_info_; +}; + +TEST_F(ZipkinDriverTest, InitializeDriver) { + { + std::string invalid_config = R"EOF( + {"fake" : "fake"} + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(invalid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + std::string empty_config = "{}"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(empty_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // Valid config but not valid cluster. + EXPECT_CALL(cm_, get("fake_cluster")).WillOnce(Return(nullptr)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // Valid config, but upstream cluster supports only http2. + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(Upstream::ClusterInfo::Features::HTTP2)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // valid config, without http2 cluster will work + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()).WillByDefault(Return(0)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + setup(*loader, true); + } +} + +TEST_F(ZipkinDriverTest, FlushSeveralSpans) { + setupValidDriver(); + + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; + const Optional timeout(std::chrono::seconds(5)); + + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) + .WillOnce( + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + callback = &callbacks; + + EXPECT_STREQ("/api/v1/spans", message->headers().Path()->value().c_str()); + EXPECT_STREQ("fake_cluster", message->headers().Host()->value().c_str()); + EXPECT_STREQ("application/json", message->headers().ContentType()->value().c_str()); + + return &request; + })); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .Times(2) + .WillRepeatedly(Return(2)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + + SpanPtr first_span = driver_->startSpan(request_headers_, operation_name_, start_time_); + first_span->finishSpan(); + + SpanPtr second_span = driver_->startSpan(request_headers_, operation_name_, start_time_); + second_span->finishSpan(); + + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "202"}}})); + + callback->onSuccess(std::move(msg)); + + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + + callback->onFailure(Http::AsyncClient::FailureReason::Reset); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_dropped").value()); +} + +TEST_F(ZipkinDriverTest, FlushOneSpanReportFailure) { + setupValidDriver(); + + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; + const Optional timeout(std::chrono::seconds(5)); + + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) + .WillOnce( + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + callback = &callbacks; + + EXPECT_STREQ("/api/v1/spans", message->headers().Path()->value().c_str()); + EXPECT_STREQ("fake_cluster", message->headers().Host()->value().c_str()); + EXPECT_STREQ("application/json", message->headers().ContentType()->value().c_str()); + + return &request; + })); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .WillOnce(Return(1)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + span->finishSpan(); + + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "404"}}})); + + // AsyncClient can fail with valid HTTP headers + callback->onSuccess(std::move(msg)); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_dropped").value()); +} + +TEST_F(ZipkinDriverTest, FlushSpansTimer) { + setupValidDriver(); + + const Optional timeout(std::chrono::seconds(5)); + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .WillOnce(Return(5)); + + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + span->finishSpan(); + + // Timer should be re-enabled. + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(5000))); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.flush_interval_ms", 5000U)) + .WillOnce(Return(5000U)); + + timer_->callback_(); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.timer_flushed").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); +} + +TEST_F(ZipkinDriverTest, SerializeAndDeserializeContext) { + setupValidDriver(); + + // Supply bogus context, that will be simply ignored. + const std::string invalid_context = "notvalidcontext"; + request_headers_.insertOtSpanContext().value(invalid_context); + driver_->startSpan(request_headers_, operation_name_, start_time_); + + std::string injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // Supply empty context. + request_headers_.removeOtSpanContext(); + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + + injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // Supply parent context, request_headers has properly populated x-ot-span-context. + SpanPtr span_with_parent = driver_->startSpan(request_headers_, operation_name_, start_time_); + injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // TODO(fabolive): need more tests for B3 annotations +} + +} // Tracing