diff --git a/include/envoy/http/header_map.h b/include/envoy/http/header_map.h index 42658f2c87ad5..2c7ff3312c6e8 100644 --- a/include/envoy/http/header_map.h +++ b/include/envoy/http/header_map.h @@ -220,7 +220,12 @@ class HeaderEntry { HEADER_FUNC(Status) \ HEADER_FUNC(TransferEncoding) \ HEADER_FUNC(Upgrade) \ - HEADER_FUNC(UserAgent) + HEADER_FUNC(UserAgent) \ + HEADER_FUNC(XB3TraceId) \ + HEADER_FUNC(XB3SpanId) \ + HEADER_FUNC(XB3ParentSpanId) \ + HEADER_FUNC(XB3Sampled) \ + HEADER_FUNC(XB3Flags) /** * The following functions are defined for each inline header above. E.g., for ContentLength we diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 52e699aadb358..88c9fae4edb88 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory(common) add_subdirectory(exe) add_subdirectory(server) +add_subdirectory(zipkin) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index d61d45750d926..14bd4df0f36ef 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -111,6 +111,7 @@ add_library( thread_local/thread_local_impl.cc tracing/http_tracer_impl.cc tracing/lightstep_tracer_impl.cc + tracing/zipkin_tracer_impl.cc upstream/cds_api_impl.cc upstream/cluster_manager_impl.cc upstream/health_checker_impl.cc diff --git a/source/common/http/headers.h b/source/common/http/headers.h index 29564da73cbac..d7e13c8ac5201 100644 --- a/source/common/http/headers.h +++ b/source/common/http/headers.h @@ -54,6 +54,11 @@ class Headers { const LowerCaseString TransferEncoding{"transfer-encoding"}; const LowerCaseString Upgrade{"upgrade"}; const LowerCaseString UserAgent{"user-agent"}; + const LowerCaseString XB3TraceId{"x-b3-traceid"}; + const LowerCaseString XB3SpanId{"x-b3-spanid"}; + const LowerCaseString XB3ParentSpanId{"x-b3-parentspanid"}; + const LowerCaseString XB3Sampled{"x-b3-sampled"}; + const LowerCaseString XB3Flags{"x-b3-flags"}; struct { const std::string Close{"close"}; diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index 975d7b7043e8f..1eaf1166ab306 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -921,7 +921,7 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( { "$schema": "http://json-schema.org/schema#", "definitions" : { - "driver" : { + "lightstep_driver" : { "type" : "object", "properties" : { "type" : { @@ -941,6 +941,26 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( "required" : ["type", "config"], "additionalProperties" : false }, + "zipkin_driver" : { + "type" : "object", + "properties" : { + "type" : { + "type" : "string", + "enum" : ["zipkin"] + }, + "config" : { + "type" : "object", + "properties" : { + "collector_cluster" : {"type" : "string"}, + "collector_endpoint": {"type": "string"} + }, + "required": ["collector_cluster", "collector_endpoint"], + "additionalProperties" : false + } + }, + "required" : ["type", "config"], + "additionalProperties" : false + }, "rate_limit_service" : { "type" : "object", "properties" : { @@ -988,7 +1008,13 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( "http": { "type" : "object", "properties" : { - "driver" : {"$ref" : "#/definitions/driver"} + "driver" : { + "type" : "object", + "oneOf" : [ + {"$ref" : "#/definitions/lightstep_driver"}, + {"$ref" : "#/definitions/zipkin_driver"} + ] + } }, "additionalProperties" : false } diff --git a/source/common/tracing/BUILD b/source/common/tracing/BUILD index 6a2b10896008c..737ee434a6dfd 100644 --- a/source/common/tracing/BUILD +++ b/source/common/tracing/BUILD @@ -7,10 +7,12 @@ envoy_cc_library( srcs = [ "http_tracer_impl.cc", "lightstep_tracer_impl.cc", + "zipkin_tracer_impl.cc", ], hdrs = [ "http_tracer_impl.h", "lightstep_tracer_impl.h", + "zipkin_tracer_impl.h", ], external_deps = ["lightstep"], deps = [ @@ -31,5 +33,6 @@ envoy_cc_library( "//source/common/http/access_log:access_log_formatter_lib", "//source/common/json:json_loader_lib", "//source/common/runtime:uuid_util_lib", + "//source/zipkin:zipkin_lib", ], ) diff --git a/source/common/tracing/zipkin_tracer_impl.cc b/source/common/tracing/zipkin_tracer_impl.cc new file mode 100644 index 0000000000000..1a97c6f0fb34d --- /dev/null +++ b/source/common/tracing/zipkin_tracer_impl.cc @@ -0,0 +1,195 @@ +#include "common/tracing/http_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" + +#include "common/common/enum_to_int.h" +#include "common/http/codes.h" +#include "common/http/headers.h" +#include "common/http/header_map_impl.h" +#include "common/http/message_impl.h" +#include "common/http/utility.h" + +#include "zipkin/zipkin_core_constants.h" + +namespace Tracing { + +ZipkinSpan::ZipkinSpan(Zipkin::Span& span) : span_(span) {} + +void ZipkinSpan::finishSpan() { span_.finish(); } + +void ZipkinSpan::setTag(const std::string& name, const std::string& value) { + if (this->hasCSAnnotation()) { + span_.setTag(name, value); + } +} + +bool ZipkinSpan::hasCSAnnotation() { + auto annotations = span_.annotations(); + if (annotations.size() > 0) { + return annotations[0].value() == Zipkin::ZipkinCoreConstants::CLIENT_SEND; + } + return false; +} + +ZipkinDriver::TlsZipkinTracer::TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver) + : tracer_(tracer), driver_(driver) {} + +ZipkinDriver::ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + Stats::Store& stats, ThreadLocal::Instance& tls, + Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info) + : cm_(cluster_manager), + tracer_stats_{ZIPKIN_TRACER_STATS(POOL_COUNTER_PREFIX(stats, "tracing.zipkin."))}, tls_(tls), + runtime_(runtime), local_info_(local_info), tls_slot_(tls.allocateSlot()) { + + Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster")); + if (!cluster) { + throw EnvoyException(fmt::format("{} collector cluster is not defined on cluster manager level", + config.getString("collector_cluster"))); + } + cluster_ = cluster->info(); + + if (cluster_->features() & Upstream::ClusterInfo::Features::HTTP2) { + throw EnvoyException( + fmt::format("Zipkin collector service (cluster {}) can be accessed over http1.1 only", + cluster_->name())); + } + + std::string collector_endpoint = config.getString("collector_endpoint"); + + tls_.set(tls_slot_, [this, collector_endpoint](Event::Dispatcher& dispatcher) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + Zipkin::Tracer tracer(local_info_.clusterName(), + local_info_.address()->asString()); + tracer.setReporter(ZipkinReporter::NewInstance( + std::ref(*this), std::ref(dispatcher), collector_endpoint)); + return ThreadLocal::ThreadLocalObjectSharedPtr{ + new TlsZipkinTracer(std::move(tracer), *this)}; + }); +} + +SpanPtr ZipkinDriver::startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) { + // TODO: start_time is not really used. + // Need to figure out whether or not it is really needed + // A new timestamp is currently generated for a new span + + Zipkin::Tracer& tracer = tls_.getTyped(tls_slot_).tracer_; + ZipkinSpanPtr active_span; + Zipkin::Span new_zipkin_span; + + // TODO (fabolive): What happens if Host header is null ? + + if (request_headers.OtSpanContext()) { + // Get the open tracing span context. + // This header contains B3 annotations set by the downstream caller. + // The context built from this header allows the zipkin tracer to + // properly set the span id and the parent span id. + Zipkin::SpanContext context; + + context.populateFromString(request_headers.OtSpanContext()->value().c_str()); + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count(), context); + } else { + new_zipkin_span = tracer.startSpan(request_headers.Host()->value().c_str(), + start_time.time_since_epoch().count()); + } + + // Set the trace-id and span-id headers properly, based on the newly-created span structure + request_headers.insertXB3TraceId().value(new_zipkin_span.traceIdAsHexString()); + request_headers.insertXB3SpanId().value(new_zipkin_span.idAsHexString()); + + // Set the parent-span header properly, based on the newly-created span structure + if (new_zipkin_span.isSet().parent_id) { + request_headers.insertXB3ParentSpanId().value(new_zipkin_span.parentIdAsHexString()); + } + + // Set sampled header + request_headers.insertXB3Sampled().value(std::string(("1"))); + + Zipkin::SpanContext new_span_context(new_zipkin_span); + + // Set the ot-span-context with the new context + request_headers.insertOtSpanContext().value(new_span_context.serializeToString()); + active_span.reset(new ZipkinSpan(new_zipkin_span)); + + return std::move(active_span); +} + +ZipkinReporter::ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint) + : driver_(driver), collector_endpoint_(collector_endpoint) { + flush_timer_ = dispatcher.createTimer([this]() -> void { + driver_.tracerStats().timer_flushed_.inc(); + flushSpans(); + enableTimer(); + }); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + span_buffer_.allocateBuffer(min_flush_spans); + + enableTimer(); +} + +std::unique_ptr +ZipkinReporter::NewInstance(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint) { + return std::unique_ptr( + new ZipkinReporter(driver, dispatcher, collector_endpoint)); +} + +void ZipkinReporter::reportSpan(Zipkin::Span&& span) { + span_buffer_.addSpan(std::move(span)); + + uint64_t min_flush_spans = + driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U); + + if (span_buffer_.pendingSpans() == min_flush_spans) { + flushSpans(); + } +} + +void ZipkinReporter::enableTimer() { + uint64_t flush_interval = + driver_.runtime().snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U); + flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); +} + +void ZipkinReporter::flushSpans() { + if (span_buffer_.pendingSpans()) { + driver_.tracerStats().spans_sent_.add(span_buffer_.pendingSpans()); + + std::string request_body = span_buffer_.toStringifiedJsonArray(); + Http::MessagePtr message(new Http::RequestMessageImpl()); + message->headers().insertMethod().value(Http::Headers::get().MethodValues.Post); + message->headers().insertPath().value(collector_endpoint_); + message->headers().insertHost().value(driver_.cluster()->name()); + message->headers().insertContentType().value(std::string("application/json")); + + Buffer::InstancePtr body(new Buffer::OwnedImpl()); + body->add(request_body); + message->body() = std::move(body); + + uint64_t timeout = + driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U); + driver_.clusterManager() + .httpAsyncClientForCluster(driver_.cluster()->name()) + .send(std::move(message), *this, std::chrono::milliseconds(timeout)); + + span_buffer_.flush(); + } +} + +void ZipkinReporter::onFailure(Http::AsyncClient::FailureReason) { + driver_.tracerStats().reports_dropped_.inc(); +} + +void ZipkinReporter::onSuccess(Http::MessagePtr&& http_response) { + if (Http::Utility::getResponseStatus(http_response->headers()) != + enumToInt(Http::Code::Accepted)) { + driver_.tracerStats().reports_dropped_.inc(); + } else { + driver_.tracerStats().reports_sent_.inc(); + } +} + +} // Tracing diff --git a/source/common/tracing/zipkin_tracer_impl.h b/source/common/tracing/zipkin_tracer_impl.h new file mode 100644 index 0000000000000..a693327999d6e --- /dev/null +++ b/source/common/tracing/zipkin_tracer_impl.h @@ -0,0 +1,97 @@ +#pragma once + +#include "envoy/runtime/runtime.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/tracing/http_tracer.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/http/header_map_impl.h" +#include "common/json/json_loader.h" + +#include "zipkin/tracer.h" +#include "zipkin/span_buffer.h" + +namespace Tracing { + +#define ZIPKIN_TRACER_STATS(COUNTER) \ + COUNTER(spans_sent) \ + COUNTER(timer_flushed) \ + COUNTER(reports_sent) \ + COUNTER(reports_dropped) + +struct ZipkinTracerStats { + ZIPKIN_TRACER_STATS(GENERATE_COUNTER_STRUCT) +}; + +class ZipkinSpan : public Span { +public: + ZipkinSpan(Zipkin::Span& span); + + void finishSpan() override; + void setTag(const std::string& name, const std::string& value) override; + + bool hasCSAnnotation(); + +private: + Zipkin::Span span_; +}; + +typedef std::unique_ptr ZipkinSpanPtr; + +class ZipkinDriver : public Driver { +public: + ZipkinDriver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, + Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime, + const LocalInfo::LocalInfo& localinfo); + + SpanPtr startSpan(Http::HeaderMap& request_headers, const std::string&, + SystemTime start_time) override; + + Upstream::ClusterManager& clusterManager() { return cm_; } + Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + Runtime::Loader& runtime() { return runtime_; } + ZipkinTracerStats& tracerStats() { return tracer_stats_; } + +private: + struct TlsZipkinTracer : ThreadLocal::ThreadLocalObject { + TlsZipkinTracer(Zipkin::Tracer tracer, ZipkinDriver& driver); + + void shutdown() override {} + + Zipkin::Tracer tracer_; + ZipkinDriver& driver_; + }; + + Upstream::ClusterManager& cm_; + Upstream::ClusterInfoConstSharedPtr cluster_; + ZipkinTracerStats tracer_stats_; + ThreadLocal::Instance& tls_; + Runtime::Loader& runtime_; + const LocalInfo::LocalInfo& local_info_; + uint32_t tls_slot_; +}; + +class ZipkinReporter : public Zipkin::Reporter, Http::AsyncClient::Callbacks { +public: + ZipkinReporter(ZipkinDriver& driver, Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); + + void reportSpan(Zipkin::Span&& span) override; + + void onSuccess(Http::MessagePtr&&) override; + void onFailure(Http::AsyncClient::FailureReason) override; + + static std::unique_ptr NewInstance(ZipkinDriver& driver, + Event::Dispatcher& dispatcher, + const std::string& collector_endpoint); + +private: + void enableTimer(); + void flushSpans(); + + ZipkinDriver& driver_; + Event::TimerPtr flush_timer_; + Zipkin::SpanBuffer span_buffer_; + std::string collector_endpoint_; +}; +} // Tracing diff --git a/source/exe/CMakeLists.txt b/source/exe/CMakeLists.txt index beb63a286339f..907b5bbb4b955 100644 --- a/source/exe/CMakeLists.txt +++ b/source/exe/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(envoy hot_restart.cc main.cc $ $ ${ENVOY_EXE_EXTRA_OBJECTS}) +add_executable(envoy hot_restart.cc main.cc $ $ $ ${ENVOY_EXE_EXTRA_OBJECTS}) if (ENVOY_TCMALLOC) target_link_libraries(envoy tcmalloc_and_profiler) diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index 3c6768f6eb195..0be537908214c 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -11,6 +11,7 @@ #include "common/ssl/context_config_impl.h" #include "common/tracing/http_tracer_impl.h" #include "common/tracing/lightstep_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" #include "common/upstream/cluster_manager_impl.h" namespace Server { @@ -118,6 +119,18 @@ void MainImpl::initializeTracers(const Json::Object& tracing_configuration) { http_tracer_.reset( new Tracing::HttpTracerImpl(std::move(lightstep_driver), server_.localInfo())); + } else if (type == "zipkin") { + if (server_.localInfo().clusterName().empty()) { + throw EnvoyException("cluster name must be defined if Zipkin tracing is enabled. See " + "--service-cluster option."); + } + + Tracing::DriverPtr zipkin_driver(new Tracing::ZipkinDriver( + *driver->getObject("config"), *cluster_manager_, server_.stats(), server_.threadLocal(), + server_.runtime(), server_.localInfo())); + + http_tracer_.reset( + new Tracing::HttpTracerImpl(std::move(zipkin_driver), server_.localInfo())); } else { throw EnvoyException(fmt::format("unsupported driver type: '{}'", type)); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4c46349c2a3fc..54d4c325b494b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -26,6 +26,7 @@ include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR}) add_executable(envoy-test $ $ + $ ${ENVOY_TEST_EXTRA_OBJECTS} common/access_log/access_log_manager_impl_test.cc common/api/api_impl_test.cc @@ -100,6 +101,7 @@ add_executable(envoy-test common/stats/thread_local_store_test.cc common/tracing/http_tracer_impl_test.cc common/tracing/lightstep_tracer_impl_test.cc + common/tracing/zipkin_tracer_impl_test.cc common/upstream/cds_api_impl_test.cc common/upstream/cluster_manager_impl_test.cc common/upstream/health_checker_impl_test.cc diff --git a/test/common/tracing/BUILD b/test/common/tracing/BUILD index dc3b2ad97093c..e798257eff611 100644 --- a/test/common/tracing/BUILD +++ b/test/common/tracing/BUILD @@ -7,6 +7,7 @@ envoy_cc_test( srcs = [ "http_tracer_impl_test.cc", "lightstep_tracer_impl_test.cc", + "zipkin_tracer_impl_test.cc", ], deps = [ "//source/common/common:base64_lib", diff --git a/test/common/tracing/http_tracer_impl_test.cc b/test/common/tracing/http_tracer_impl_test.cc index df18f9f524e62..cda5dc83350fa 100644 --- a/test/common/tracing/http_tracer_impl_test.cc +++ b/test/common/tracing/http_tracer_impl_test.cc @@ -5,7 +5,6 @@ #include "common/runtime/runtime_impl.h" #include "common/runtime/uuid_util.h" #include "common/tracing/http_tracer_impl.h" -#include "common/tracing/lightstep_tracer_impl.h" #include "test/mocks/http/mocks.h" #include "test/mocks/local_info/mocks.h" diff --git a/test/common/tracing/zipkin_tracer_impl_test.cc b/test/common/tracing/zipkin_tracer_impl_test.cc new file mode 100644 index 0000000000000..a37c38ada7a89 --- /dev/null +++ b/test/common/tracing/zipkin_tracer_impl_test.cc @@ -0,0 +1,273 @@ +#include "common/http/headers.h" +#include "common/http/header_map_impl.h" +#include "common/http/message_impl.h" +#include "common/runtime/runtime_impl.h" +#include "common/runtime/uuid_util.h" +#include "common/tracing/http_tracer_impl.h" +#include "common/tracing/zipkin_tracer_impl.h" + +#include "test/mocks/http/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/test_common/utility.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::Test; + +namespace Tracing { + +class ZipkinDriverTest : public Test { +public: + void setup(Json::Object& config, bool init_timer) { + ON_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillByDefault(ReturnRef(cm_.async_client_)); + + if (init_timer) { + timer_ = new NiceMock(&tls_.dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(5000))); + } + + driver_.reset(new ZipkinDriver(config, cm_, stats_, tls_, runtime_, local_info_)); + } + + void setupValidDriver() { + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(0)); // No HTTP2 for zipkin upstreams + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + setup(*loader, true); + } + + const std::string operation_name_{"test"}; + Http::TestHeaderMapImpl request_headers_{ + {":authority", "api.lyft.com"}, {":path", "/"}, {":method", "GET"}, {"x-request-id", "foo"}}; + SystemTime start_time_; + Http::AccessLog::MockRequestInfo request_info_; + + std::unique_ptr driver_; + NiceMock* timer_; + Stats::IsolatedStoreImpl stats_; + NiceMock cm_; + NiceMock runtime_; + NiceMock tls_; + NiceMock local_info_; +}; + +TEST_F(ZipkinDriverTest, InitializeDriver) { + { + std::string invalid_config = R"EOF( + {"fake" : "fake"} + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(invalid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + std::string empty_config = "{}"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(empty_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // Valid config but not valid cluster. + EXPECT_CALL(cm_, get("fake_cluster")).WillOnce(Return(nullptr)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // Valid config, but upstream cluster supports only http2. + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(Upstream::ClusterInfo::Features::HTTP2)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + EXPECT_THROW(setup(*loader, false), EnvoyException); + } + + { + // valid config, without http2 cluster will work + EXPECT_CALL(cm_, get("fake_cluster")).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()).WillByDefault(Return(0)); + + std::string valid_config = R"EOF( + { + "collector_cluster": "fake_cluster", + "collector_endpoint": "/api/v1/spans" + } + )EOF"; + Json::ObjectPtr loader = Json::Factory::LoadFromString(valid_config); + + setup(*loader, true); + } +} + +TEST_F(ZipkinDriverTest, FlushSeveralSpans) { + setupValidDriver(); + + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; + const Optional timeout(std::chrono::seconds(5)); + + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) + .WillOnce( + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + callback = &callbacks; + + EXPECT_STREQ("/api/v1/spans", message->headers().Path()->value().c_str()); + EXPECT_STREQ("fake_cluster", message->headers().Host()->value().c_str()); + EXPECT_STREQ("application/json", message->headers().ContentType()->value().c_str()); + + return &request; + })); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .Times(2) + .WillRepeatedly(Return(2)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + + SpanPtr first_span = driver_->startSpan(request_headers_, operation_name_, start_time_); + first_span->finishSpan(); + + SpanPtr second_span = driver_->startSpan(request_headers_, operation_name_, start_time_); + second_span->finishSpan(); + + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "202"}}})); + + callback->onSuccess(std::move(msg)); + + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + + callback->onFailure(Http::AsyncClient::FailureReason::Reset); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_dropped").value()); +} + +TEST_F(ZipkinDriverTest, FlushOneSpanReportFailure) { + setupValidDriver(); + + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback; + const Optional timeout(std::chrono::seconds(5)); + + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)) + .WillOnce( + Invoke([&](Http::MessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Optional&) -> Http::AsyncClient::Request* { + callback = &callbacks; + + EXPECT_STREQ("/api/v1/spans", message->headers().Path()->value().c_str()); + EXPECT_STREQ("fake_cluster", message->headers().Host()->value().c_str()); + EXPECT_STREQ("application/json", message->headers().ContentType()->value().c_str()); + + return &request; + })); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .WillOnce(Return(1)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + span->finishSpan(); + + Http::MessagePtr msg(new Http::ResponseMessageImpl( + Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "404"}}})); + + // AsyncClient can fail with valid HTTP headers + callback->onSuccess(std::move(msg)); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_dropped").value()); +} + +TEST_F(ZipkinDriverTest, FlushSpansTimer) { + setupValidDriver(); + + const Optional timeout(std::chrono::seconds(5)); + EXPECT_CALL(cm_.async_client_, send_(_, _, timeout)); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .WillOnce(Return(5)); + + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + span->finishSpan(); + + // Timer should be re-enabled. + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(5000))); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillOnce(Return(5000U)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.flush_interval_ms", 5000U)) + .WillOnce(Return(5000U)); + + timer_->callback_(); + + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.timer_flushed").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); +} + +TEST_F(ZipkinDriverTest, SerializeAndDeserializeContext) { + setupValidDriver(); + + // Supply bogus context, that will be simply ignored. + const std::string invalid_context = "notvalidcontext"; + request_headers_.insertOtSpanContext().value(invalid_context); + driver_->startSpan(request_headers_, operation_name_, start_time_); + + std::string injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // Supply empty context. + request_headers_.removeOtSpanContext(); + SpanPtr span = driver_->startSpan(request_headers_, operation_name_, start_time_); + + injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // Supply parent context, request_headers has properly populated x-ot-span-context. + SpanPtr span_with_parent = driver_->startSpan(request_headers_, operation_name_, start_time_); + injected_ctx = request_headers_.OtSpanContext()->value().c_str(); + EXPECT_FALSE(injected_ctx.empty()); + + // TODO(fabolive): need more tests for B3 annotations +} + +} // Tracing