diff --git a/BUILD.bazel b/BUILD.bazel index 82663c60..f11d9cab 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1,6 +1,28 @@ cc_library( name = "dd_opentracing_cpp", - srcs = glob(["src/*.cpp", "src/*.h"]) + [ + srcs = [ + "src/clock.h", + "src/noopspan.cpp", + "src/noopspan.h", + "src/opentracing_external.cpp", + "src/propagation.cpp", + "src/propagation.h", + "src/encoder.cpp", + "src/encoder.h", + "src/sample.cpp", + "src/sample.h", + "src/span_buffer.cpp", + "src/span_buffer.h", + "src/span.cpp", + "src/span.h", + "src/tracer.cpp", + "src/tracer.h", + "src/transport.cpp", + "src/transport.h", + "src/version_check.cpp", + "src/version_check.h", + "src/writer.cpp", + "src/writer.h", ":version_number.h", ], hdrs = [ diff --git a/include/datadog/opentracing.h b/include/datadog/opentracing.h index cf008e02..145c2995 100644 --- a/include/datadog/opentracing.h +++ b/include/datadog/opentracing.h @@ -3,6 +3,9 @@ #include +#include +#include + namespace ot = opentracing; namespace datadog { @@ -32,7 +35,32 @@ struct TracerOptions { std::string operation_name_override = ""; }; +// TraceEncoder exposes the data required to encode and submit traces to the +// Datadog Agent. +class TraceEncoder { + public: + TraceEncoder() {} + virtual ~TraceEncoder() {} + + // Returns the Datadog Agent endpoint that traces should be sent to. + virtual const std::string path() = 0; + virtual std::size_t pendingTraces() = 0; + virtual void clearTraces() = 0; + // Returns the HTTP headers that are required for the collection of traces. + virtual const std::map headers() = 0; + // Returns the encoded payload from the collection of traces. + virtual const std::string payload() = 0; +}; + +// makeTracer returns an opentracing::Tracer that submits traces to the Datadog Agent. +// This should be used when control over the HTTP requests to the Datadog Agent is not required. std::shared_ptr makeTracer(const TracerOptions &options); +// makeTracerAndEncoder initializes an opentracing::Tracer and provides an encoder +// to use when submitting traces to the Datadog Agent. +// This should be used in applications that need to also control the HTTP requests to the Datadog +// Agent. +std::tuple, std::shared_ptr> makeTracerAndEncoder( + const TracerOptions &options); } // namespace opentracing } // namespace datadog diff --git a/src/agent_writer.cpp b/src/agent_writer.cpp new file mode 100644 index 00000000..42abb85b --- /dev/null +++ b/src/agent_writer.cpp @@ -0,0 +1,173 @@ +#include "agent_writer.h" +#include +#include "encoder.h" +#include "version_number.h" + +namespace datadog { +namespace opentracing { + +namespace { +const std::string agent_protocol = "http://"; +const size_t max_queued_traces = 7000; +// Retry sending traces to agent a couple of times. Any more than that and the agent won't accept +// them. +// write_period 1s + timeout 2s + (retry & timeout) 2.5s + (retry and timeout) 4.5s = 10s. +const std::vector default_retry_periods{ + std::chrono::milliseconds(500), std::chrono::milliseconds(2500)}; +// Agent communication timeout. +const long default_timeout_ms = 2000L; +} // namespace + +AgentWriter::AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period) + : AgentWriter(std::unique_ptr{new CurlHandle{}}, write_period, max_queued_traces, + default_retry_periods, host, port){}; + +AgentWriter::AgentWriter(std::unique_ptr handle, std::chrono::milliseconds write_period, + size_t max_queued_traces, + std::vector retry_periods, std::string host, + uint32_t port) + : write_period_(write_period), + max_queued_traces_(max_queued_traces), + retry_periods_(retry_periods) { + setUpHandle(handle, host, port); + startWriting(std::move(handle)); +} + +void AgentWriter::setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port) { + // Some options are the same for all actions, set them here. + // Set the agent URI. + std::stringstream agent_uri; + agent_uri << agent_protocol << host << ":" << std::to_string(port) << trace_encoder_->path(); + auto rcode = handle->setopt(CURLOPT_URL, agent_uri.str().c_str()); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set agent URL: ") + curl_easy_strerror(rcode)); + } + rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set agent timeout: ") + + curl_easy_strerror(rcode)); + } +} // namespace opentracing + +AgentWriter::~AgentWriter() { stop(); } + +void AgentWriter::stop() { + { + std::unique_lock lock(mutex_); + if (stop_writing_) { + return; // Already stopped. + } + stop_writing_ = true; + } + condition_.notify_all(); + worker_->join(); +} + +void AgentWriter::write(Trace trace) { + std::unique_lock lock(mutex_); + if (stop_writing_) { + return; + } + if (trace_encoder_->pendingTraces() >= max_queued_traces_) { + return; + } + trace_encoder_->addTrace(std::move(trace)); +}; + +void AgentWriter::startWriting(std::unique_ptr handle) { + // Start worker that sends Traces to agent. + // We can capture 'this' because destruction of this stops the thread and the lambda. + worker_ = std::make_unique( + [this](std::unique_ptr handle) { + size_t num_traces = 0; + std::map headers; + std::string payload; + while (true) { + // Encode traces when there are new ones. + { + // Wait to be told about new traces (or to stop). + std::unique_lock lock(mutex_); + condition_.wait_for(lock, write_period_, + [&]() -> bool { return flush_worker_ || stop_writing_; }); + if (stop_writing_) { + return; // Stop the thread. + } + num_traces = trace_encoder_->pendingTraces(); + if (num_traces == 0) { + continue; + } + headers = trace_encoder_->headers(); + payload = trace_encoder_->payload(); + trace_encoder_->clearTraces(); + } // lock on mutex_ ends. + // Send spans, not in critical period. + retryFiniteOnFail([&]() { return AgentWriter::postTraces(handle, headers, payload); }); + // Let thread calling 'flush' that we're done flushing. + { + std::unique_lock lock(mutex_); + flush_worker_ = false; + } + condition_.notify_all(); + } + }, + std::move(handle)); +} + +void AgentWriter::flush() try { + std::unique_lock lock(mutex_); + flush_worker_ = true; + condition_.notify_all(); + // Wait until flush is complete. + condition_.wait(lock, [&]() -> bool { return !flush_worker_ || stop_writing_; }); +} catch (const std::bad_alloc &) { +} + +void AgentWriter::retryFiniteOnFail(std::function f) const { + for (std::chrono::milliseconds backoff : retry_periods_) { + if (f()) { + return; + } + { + // Just check for stop_writing_ between attempts. No need to allow wake-from-sleep + // stop_writing signal during retry period since that should always be short. + std::unique_lock lock(mutex_); + if (stop_writing_) { + return; + } + } + std::this_thread::sleep_for(backoff); + } + f(); // Final try after final sleep. +} + +bool AgentWriter::postTraces(std::unique_ptr &handle, + std::map headers, std::string payload) try { + handle->setHeaders(headers); + + // We have to set the size manually, because msgpack uses null characters. + CURLcode rcode = handle->setopt(CURLOPT_POSTFIELDSIZE, payload.size()); + if (rcode != CURLE_OK) { + std::cerr << "Error setting agent request size: " << curl_easy_strerror(rcode) << std::endl; + return false; + } + + rcode = handle->setopt(CURLOPT_POSTFIELDS, payload.data()); + if (rcode != CURLE_OK) { + std::cerr << "Error setting agent request body: " << curl_easy_strerror(rcode) << std::endl; + return false; + } + + rcode = handle->perform(); + if (rcode != CURLE_OK) { + std::cerr << "Error sending traces to agent: " << curl_easy_strerror(rcode) << std::endl + << handle->getError() << std::endl; + return false; + } + return true; +} catch (const std::bad_alloc &) { + // Drop spans, but live to fight another day. + return true; // Don't attempt to retry. +} + +} // namespace opentracing +} // namespace datadog diff --git a/src/agent_writer.h b/src/agent_writer.h new file mode 100644 index 00000000..ec897bf5 --- /dev/null +++ b/src/agent_writer.h @@ -0,0 +1,79 @@ +#ifndef DD_OPENTRACING_AGENT_WRITER_H +#define DD_OPENTRACING_AGENT_WRITER_H + +#include +#include +#include +#include +#include +#include +#include "encoder.h" +#include "span.h" +#include "transport.h" + +namespace datadog { +namespace opentracing { + +// A Writer that sends Traces (collections of Spans) to a Datadog agent. +class AgentWriter : public Writer { + public: + // Creates an AgentWriter that uses curl to send Traces to a Datadog agent. May throw a + // runtime_exception. + AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period); + + AgentWriter(std::unique_ptr handle, std::chrono::milliseconds write_period, + size_t max_queued_traces, std::vector retry_periods, + std::string host, uint32_t port); + + // Does not flush on destruction, buffered traces may be lost. Stops all threads. + ~AgentWriter() override; + + void write(Trace trace) override; + + // Send all buffered Traces to the destination now. Will block until sending is complete. This + // isn't on the main Writer API because real code should not need to call this. + void flush(); + + // Permanently stops writing Traces. Calls to write() and flush() will do nothing. + void stop(); + + private: + // Initialises the curl handle. May throw a runtime_exception. + void setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port); + + // Starts asynchronously writing traces. They will be written periodically (set by write_period_) + // or when flush() is called manually. + void startWriting(std::unique_ptr handle); + // Posts the given Traces to the Agent. Returns true if it succeeds, otherwise false. + static bool postTraces(std::unique_ptr &handle, + std::map headers, std::string payload); + // Retries the given function a finite number of times according to retry_periods_. Retries when + // f() returns false. + void retryFiniteOnFail(std::function f) const; + + // How often to send Traces. + const std::chrono::milliseconds write_period_; + const size_t max_queued_traces_; + // How long to wait before retrying each time. If empty, only try once. + const std::vector retry_periods_; + + // The thread on which traces are encoded and send to the agent. Receives traces on the + // traces_ queue as notified by condition_. Encodes traces to buffer_ and sends to the + // agent. + std::unique_ptr worker_ = nullptr; + // Locks access to the traces_ queue and the stop_writing_ and flush_worker_ signals. + mutable std::mutex mutex_; + // Notifies worker thread when there are new traces in the queue or it should stop. + std::condition_variable condition_; + // These two bools, stop_writing_ and flush_worker_, act as signals. They are the predicates on + // which the condition_ variable acts. + // If set to true, stops worker. Locked by mutex_; + bool stop_writing_ = false; + // If set to true, flushes worker (which sets it false again). Locked by mutex_; + bool flush_worker_ = false; +}; + +} // namespace opentracing +} // namespace datadog + +#endif // DD_OPENTRACING_AGENT_WRITER_H diff --git a/src/encoder.cpp b/src/encoder.cpp new file mode 100644 index 00000000..254cad31 --- /dev/null +++ b/src/encoder.cpp @@ -0,0 +1,39 @@ +#include "encoder.h" +#include "span.h" +#include "version_number.h" + +namespace datadog { +namespace opentracing { + +AgentHttpEncoder::AgentHttpEncoder() { + // Set up common headers and default encoder + common_headers_ = {{"Content-Type", "application/msgpack"}, + {"Datadog-Meta-Lang", "cpp"}, + {"Datadog-Meta-Tracer-Version", config::tracer_version}}; +} + +const std::string agent_api_path = "/v0.3/traces"; + +const std::string AgentHttpEncoder::path() { return agent_api_path; } + +void AgentHttpEncoder::clearTraces() { traces_.clear(); } + +std::size_t AgentHttpEncoder::pendingTraces() { return traces_.size(); } + +const std::map AgentHttpEncoder::headers() { + std::map headers(common_headers_); + headers["X-Datadog-Trace-Count"] = std::to_string(traces_.size()); + return headers; +} + +const std::string AgentHttpEncoder::payload() { + buffer_.clear(); + buffer_.str(std::string{}); + msgpack::pack(buffer_, traces_); + return buffer_.str(); +} + +void AgentHttpEncoder::addTrace(Trace trace) { traces_.push_back(std::move(trace)); } + +} // namespace opentracing +} // namespace datadog diff --git a/src/encoder.h b/src/encoder.h new file mode 100644 index 00000000..113029bc --- /dev/null +++ b/src/encoder.h @@ -0,0 +1,39 @@ +#ifndef DD_OPENTRACING_ENCODER_H +#define DD_OPENTRACING_ENCODER_H + +#include + +#include + +namespace datadog { +namespace opentracing { + +struct SpanData; +using Trace = std::unique_ptr>>; + +class AgentHttpEncoder : public TraceEncoder { + public: + AgentHttpEncoder(); + ~AgentHttpEncoder() override {} + + // Returns the path that is used to submit HTTP requests to the agent. + const std::string path() override; + std::size_t pendingTraces() override; + void clearTraces() override; + // Returns the HTTP headers that are required for the collection of traces. + const std::map headers() override; + // Returns the encoded payload from the collection of traces. + const std::string payload() override; + void addTrace(Trace trace); + + private: + // Holds the headers that are used for all HTTP requests. + std::map common_headers_; + std::deque traces_; + std::stringstream buffer_; +}; + +} // namespace opentracing +} // namespace datadog + +#endif // DD_OPENTRACING_ENCODER_H diff --git a/src/noopspan.h b/src/noopspan.h index 17782391..4b7f9236 100644 --- a/src/noopspan.h +++ b/src/noopspan.h @@ -20,7 +20,7 @@ class NoopSpan : public ot::Span { uint64_t parent_id, SpanContext context, const ot::StartSpanOptions &options); NoopSpan() = delete; NoopSpan(NoopSpan &&other); - ~NoopSpan() = default; + ~NoopSpan() override = default; void FinishWithOptions(const ot::FinishSpanOptions &finish_span_options) noexcept override; void SetOperationName(ot::string_view name) noexcept override; diff --git a/src/opentracing.cpp b/src/opentracing.cpp deleted file mode 100644 index aaf560fa..00000000 --- a/src/opentracing.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include -#include "tracer.h" - -namespace ot = opentracing; - -namespace datadog { -namespace opentracing { - -std::shared_ptr makeTracer(const TracerOptions &options) { - return std::shared_ptr{new Tracer{options}}; -} - -} // namespace opentracing -} // namespace datadog diff --git a/src/opentracing_agent.cpp b/src/opentracing_agent.cpp new file mode 100644 index 00000000..76766d37 --- /dev/null +++ b/src/opentracing_agent.cpp @@ -0,0 +1,23 @@ +// Implementation of the exposed makeTracer function. +// This is kept separately to isolate the AgentWriter and its cURL dependency. +// Users of the library that do not use this tracer are able to avoid the +// additional dependency and implementation details. + +#include +#include "agent_writer.h" +#include "tracer.h" + +namespace ot = opentracing; + +namespace datadog { +namespace opentracing { + +std::shared_ptr makeTracer(const TracerOptions &options) { + auto writer = std::shared_ptr{ + new AgentWriter(options.agent_host, options.agent_port, + std::chrono::milliseconds(llabs(options.write_period_ms)))}; + return std::shared_ptr{new Tracer{options, writer}}; +} + +} // namespace opentracing +} // namespace datadog diff --git a/src/opentracing_external.cpp b/src/opentracing_external.cpp new file mode 100644 index 00000000..59bbef62 --- /dev/null +++ b/src/opentracing_external.cpp @@ -0,0 +1,27 @@ +// Implementation of the exposed makeTracerAndEncoder function. +// This is intentionally kept separate from the makeTracer function, which has additional +// dependencies. It allows the library to be used with an external HTTP implementation for sending +// traces to the Datadog Agent. +// +// See BAZEL.build for the files required to build this library using makeTracerAndEncoder. + +#include +#include "tracer.h" +#include "writer.h" + +namespace ot = opentracing; + +namespace datadog { +namespace opentracing { + +std::tuple, std::shared_ptr> makeTracerAndEncoder( + const TracerOptions &options) { + auto xwriter = std::make_shared(); + auto encoder = xwriter->encoder(); + std::shared_ptr writer = xwriter; + return std::tuple, std::shared_ptr>{ + std::shared_ptr{new Tracer{options, writer}}, encoder}; +} + +} // namespace opentracing +} // namespace datadog diff --git a/src/propagation.cpp b/src/propagation.cpp index 37d63b07..44ea4933 100644 --- a/src/propagation.cpp +++ b/src/propagation.cpp @@ -87,7 +87,7 @@ std::string SpanContext::baggageItem(ot::string_view key) const { SpanContext SpanContext::withId(uint64_t id) const { std::lock_guard lock{mutex_}; auto baggage = baggage_; // (Shallow) copy baggage. - return std::move(SpanContext{id, trace_id_, std::move(baggage)}); + return SpanContext{id, trace_id_, std::move(baggage)}; } ot::expected SpanContext::serialize(const ot::TextMapWriter &writer) const { @@ -149,8 +149,8 @@ ot::expected> SpanContext::deserialize( // Partial context, this shouldn't happen. return ot::make_unexpected(ot::span_context_corrupted_error); } - return std::move( - std::unique_ptr{new SpanContext{parent_id, trace_id, std::move(baggage)}}); + return std::unique_ptr( + std::make_unique(parent_id, trace_id, std::move(baggage))); } catch (const std::bad_alloc &) { return ot::make_unexpected(std::make_error_code(std::errc::not_enough_memory)); } diff --git a/src/span.cpp b/src/span.cpp index 010d8579..d6fd1561 100644 --- a/src/span.cpp +++ b/src/span.cpp @@ -38,7 +38,7 @@ uint64_t SpanData::spanId() const { return span_id; } std::unique_ptr makeSpanData(std::string type, std::string service, ot::string_view resource, std::string name, - uint64_t trace_id, int64_t span_id, uint64_t parent_id, + uint64_t trace_id, uint64_t span_id, uint64_t parent_id, int64_t start) { return std::unique_ptr{ new SpanData(type, service, resource, name, trace_id, span_id, parent_id, start, 0, 0)}; diff --git a/src/span.h b/src/span.h index 25e86a65..ec513f6e 100644 --- a/src/span.h +++ b/src/span.h @@ -12,18 +12,15 @@ namespace opentracing { class Tracer; class SpanBuffer; -class SpanData; typedef std::function IdProvider; // See tracer.h -using Trace = std::unique_ptr>>; - // Contains data that describes a Span. struct SpanData { ~SpanData() = default; friend std::unique_ptr makeSpanData(std::string type, std::string service, ot::string_view resource, std::string name, - uint64_t trace_id, int64_t span_id, + uint64_t trace_id, uint64_t span_id, uint64_t parent_id, int64_t start); friend std::unique_ptr stubSpanData(); @@ -55,9 +52,11 @@ struct SpanData { uint64_t spanId() const; MSGPACK_DEFINE_MAP(name, service, resource, type, start, duration, meta, span_id, trace_id, - parent_id, error); + parent_id, error) }; +using Trace = std::unique_ptr>>; + // A Span, a component of a trace, a single instrumented event. class Span : public ot::Span { public: diff --git a/src/span_buffer.h b/src/span_buffer.h index 9ba83fd4..3f7557d8 100644 --- a/src/span_buffer.h +++ b/src/span_buffer.h @@ -10,9 +10,7 @@ namespace datadog { namespace opentracing { -class SpanData; class Writer; -using Trace = std::unique_ptr>>; struct PendingTrace { PendingTrace() diff --git a/src/tracer.cpp b/src/tracer.cpp index f7c229b5..11ac0b24 100644 --- a/src/tracer.cpp +++ b/src/tracer.cpp @@ -16,13 +16,6 @@ uint64_t getId() { return distribution(source); } -Tracer::Tracer(TracerOptions options) - : Tracer(options, - std::shared_ptr{new WritingSpanBuffer{std::make_shared( - options.agent_host, options.agent_port, - std::chrono::milliseconds(llabs(options.write_period_ms)))}}, - getRealTime, getId, ConstantRateSampler(options.sample_rate)) {} - Tracer::Tracer(TracerOptions options, std::shared_ptr buffer, TimeProvider get_time, IdProvider get_id, SampleProvider sampler) : opts_(options), @@ -31,6 +24,14 @@ Tracer::Tracer(TracerOptions options, std::shared_ptr buffer, TimePr get_id_(get_id), sampler_(sampler) {} +Tracer::Tracer(TracerOptions options, std::shared_ptr &writer) + : opts_(options), + get_time_(getRealTime), + get_id_(getId), + sampler_(ConstantRateSampler(options.sample_rate)) { + buffer_ = std::shared_ptr{new WritingSpanBuffer{writer}}; +} + std::unique_ptr Tracer::StartSpanWithOptions(ot::string_view operation_name, const ot::StartSpanOptions &options) const noexcept try { @@ -57,10 +58,10 @@ std::unique_ptr Tracer::StartSpanWithOptions(ot::string_view operation std::move(span_context), get_time_(), opts_.service, opts_.type, operation_name, operation_name, opts_.operation_name_override}}; sampler_.tag(span); - return std::move(span); + return span; } else { - return std::move(std::unique_ptr{new NoopSpan{ - shared_from_this(), span_id, trace_id, parent_id, std::move(span_context), options}}); + return std::unique_ptr{new NoopSpan{shared_from_this(), span_id, trace_id, parent_id, + std::move(span_context), options}}; } } catch (const std::bad_alloc &) { // At least don't crash. diff --git a/src/tracer.h b/src/tracer.h index 9184824b..45d58882 100644 --- a/src/tracer.h +++ b/src/tracer.h @@ -3,6 +3,7 @@ #include #include "clock.h" +#include "encoder.h" #include "sample.h" #include "span.h" #include "span_buffer.h" @@ -16,8 +17,6 @@ namespace ot = opentracing; namespace datadog { namespace opentracing { -class Writer; -class SpanData; class SpanBuffer; // The interface for providing IDs to spans and traces. @@ -27,13 +26,16 @@ uint64_t getId(); class Tracer : public ot::Tracer, public std::enable_shared_from_this { public: - // Creates a Tracer by copying the given options. - Tracer(TracerOptions options); - // Creates a Tracer by copying the given options and injecting the given dependencies. Tracer(TracerOptions options, std::shared_ptr buffer, TimeProvider get_time, IdProvider get_id, SampleProvider sample); + // Creates a Tracer by copying the given options and using the preconfigured writer. + // The writer is either an AgentWriter that sends trace data directly to the Datadog Agent, or + // an ExternalWriter that requires an external HTTP client to encode and submit to the Datadog + // Agent. + Tracer(TracerOptions options, std::shared_ptr &writer); + Tracer() = delete; // Starts a new span. diff --git a/src/tracer_factory.cpp b/src/tracer_factory.cpp index 8edc04e4..32a4d306 100644 --- a/src/tracer_factory.cpp +++ b/src/tracer_factory.cpp @@ -2,6 +2,7 @@ #include #include +#include "agent_writer.h" #include "tracer.h" using json = nlohmann::json; @@ -56,7 +57,11 @@ ot::expected> TracerFactory::MakeTracer( error_message = "configuration has an argument with an incorrect type"; return ot::make_unexpected(std::make_error_code(std::errc::invalid_argument)); } - return std::shared_ptr{new TracerImpl{options}}; + auto writer = std::shared_ptr{ + new AgentWriter(options.agent_host, options.agent_port, + std::chrono::milliseconds(llabs(options.write_period_ms)))}; + + return std::shared_ptr{new TracerImpl{options, writer}}; } catch (const std::bad_alloc &) { return ot::make_unexpected(std::make_error_code(std::errc::not_enough_memory)); } diff --git a/src/transport.h b/src/transport.h index f6c4f8c1..c36db197 100644 --- a/src/transport.h +++ b/src/transport.h @@ -11,8 +11,8 @@ namespace opentracing { // An interface to a CURL handle. This interface exists to make testing Recorder easier. class Handle { public: - Handle(){}; - virtual ~Handle(){}; + Handle() {} + virtual ~Handle() {} virtual CURLcode setopt(CURLoption key, const char* value) = 0; virtual CURLcode setopt(CURLoption key, long value) = 0; virtual void setHeaders(std::map headers) = 0; diff --git a/src/writer.cpp b/src/writer.cpp index 8e86143e..3cd40cb4 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -1,180 +1,14 @@ #include "writer.h" #include +#include "encoder.h" #include "version_number.h" namespace datadog { namespace opentracing { -namespace { -const std::string agent_api_path = "/v0.3/traces"; -const std::string agent_protocol = "http://"; -const size_t max_queued_traces = 7000; -// Retry sending traces to agent a couple of times. Any more than that and the agent won't accept -// them. -// write_period 1s + timeout 2s + (retry & timeout) 2.5s + (retry and timeout) 4.5s = 10s. -const std::vector default_retry_periods{ - std::chrono::milliseconds(500), std::chrono::milliseconds(2500)}; -// Agent communication timeout. -const long default_timeout_ms = 2000L; -} // namespace +Writer::Writer() : trace_encoder_(std::make_shared()) {} -AgentWriter::AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period) - : AgentWriter(std::unique_ptr{new CurlHandle{}}, config::tracer_version, write_period, - max_queued_traces, default_retry_periods, host, port){}; - -AgentWriter::AgentWriter(std::unique_ptr handle, std::string tracer_version, - std::chrono::milliseconds write_period, size_t max_queued_traces, - std::vector retry_periods, std::string host, - uint32_t port) - : tracer_version_(tracer_version), - write_period_(write_period), - max_queued_traces_(max_queued_traces), - retry_periods_(retry_periods) { - setUpHandle(handle, host, port); - startWriting(std::move(handle)); -} - -void AgentWriter::setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port) { - // Some options are the same for all actions, set them here. - // Set the agent URI. - std::stringstream agent_uri; - agent_uri << agent_protocol << host << ":" << std::to_string(port) << agent_api_path; - auto rcode = handle->setopt(CURLOPT_URL, agent_uri.str().c_str()); - if (rcode != CURLE_OK) { - throw std::runtime_error(std::string("Unable to set agent URL: ") + curl_easy_strerror(rcode)); - } - rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms); - if (rcode != CURLE_OK) { - throw std::runtime_error(std::string("Unable to set agent timeout: ") + - curl_easy_strerror(rcode)); - } - // Set the common HTTP headers. - handle->setHeaders({{"Content-Type", "application/msgpack"}, - {"Datadog-Meta-Lang", "cpp"}, - {"Datadog-Meta-Tracer-Version", tracer_version_}}); -} // namespace opentracing - -AgentWriter::~AgentWriter() { stop(); } - -void AgentWriter::stop() { - { - std::unique_lock lock(mutex_); - if (stop_writing_) { - return; // Already stopped. - } - stop_writing_ = true; - } - condition_.notify_all(); - worker_->join(); -} - -void AgentWriter::write(Trace trace) { - std::unique_lock lock(mutex_); - if (stop_writing_) { - return; - } - if (traces_.size() >= max_queued_traces_) { - return; - } - traces_.push_back(std::move(trace)); -}; - -void AgentWriter::startWriting(std::unique_ptr handle) { - // Start worker that sends Traces to agent. - // We can capture 'this' because destruction of this stops the thread and the lambda. - worker_ = std::make_unique( - [this](std::unique_ptr handle) { - std::stringstream buffer; - size_t num_traces = 0; - while (true) { - // Encode traces when there are new ones. - { - // Wait to be told about new traces (or to stop). - std::unique_lock lock(mutex_); - condition_.wait_for(lock, write_period_, - [&]() -> bool { return flush_worker_ || stop_writing_; }); - if (stop_writing_) { - return; // Stop the thread. - } - num_traces = traces_.size(); - if (num_traces == 0) { - continue; - } - // Clear the buffer but keep the allocated memory. - buffer.clear(); - buffer.str(std::string{}); - msgpack::pack(buffer, traces_); - traces_.clear(); - } // lock on mutex_ ends. - // Send spans, not in critical period. - retryFiniteOnFail([&]() { return AgentWriter::postTraces(handle, buffer, num_traces); }); - // Let thread calling 'flush' that we're done flushing. - { - std::unique_lock lock(mutex_); - flush_worker_ = false; - } - condition_.notify_all(); - } - }, - std::move(handle)); -} - -void AgentWriter::flush() try { - std::unique_lock lock(mutex_); - flush_worker_ = true; - condition_.notify_all(); - // Wait until flush is complete. - condition_.wait(lock, [&]() -> bool { return !flush_worker_ || stop_writing_; }); -} catch (const std::bad_alloc &) { -} - -void AgentWriter::retryFiniteOnFail(std::function f) const { - for (std::chrono::milliseconds backoff : retry_periods_) { - if (f()) { - return; - } - { - // Just check for stop_writing_ between attempts. No need to allow wake-from-sleep - // stop_writing signal during retry period since that should always be short. - std::unique_lock lock(mutex_); - if (stop_writing_) { - return; - } - } - std::this_thread::sleep_for(backoff); - } - f(); // Final try after final sleep. -} - -bool AgentWriter::postTraces(std::unique_ptr &handle, std::stringstream &buffer, - size_t num_traces) try { - handle->setHeaders({{"X-Datadog-Trace-Count", std::to_string(num_traces)}}); - - // We have to set the size manually, because msgpack uses null characters. - std::string post_fields = buffer.str(); - CURLcode rcode = handle->setopt(CURLOPT_POSTFIELDSIZE, post_fields.size()); - if (rcode != CURLE_OK) { - std::cerr << "Error setting agent request size: " << curl_easy_strerror(rcode) << std::endl; - return false; - } - - rcode = handle->setopt(CURLOPT_POSTFIELDS, post_fields.data()); - if (rcode != CURLE_OK) { - std::cerr << "Error setting agent request body: " << curl_easy_strerror(rcode) << std::endl; - return false; - } - - rcode = handle->perform(); - if (rcode != CURLE_OK) { - std::cerr << "Error sending traces to agent: " << curl_easy_strerror(rcode) << std::endl - << handle->getError() << std::endl; - return false; - } - return true; -} catch (const std::bad_alloc &) { - // Drop spans, but live to fight another day. - return true; // Don't attempt to retry. -} +void ExternalWriter::write(Trace trace) { trace_encoder_->addTrace(std::move(trace)); } } // namespace opentracing } // namespace datadog diff --git a/src/writer.h b/src/writer.h index f793f1fb..09307840 100644 --- a/src/writer.h +++ b/src/writer.h @@ -7,87 +7,38 @@ #include #include #include +#include "encoder.h" #include "span.h" #include "transport.h" namespace datadog { namespace opentracing { -class SpanData; -using Trace = std::unique_ptr>>; - -// Takes Traces and writes them (eg. sends them to Datadog). +// A Writer is used to submit completed traces to the Datadog agent. class Writer { public: - Writer() {} + Writer(); virtual ~Writer() {} // Writes the given Trace. virtual void write(Trace trace) = 0; + + protected: + std::shared_ptr trace_encoder_; }; -// A Writer that sends Traces (collections of Spans) to a Datadog agent. -class AgentWriter : public Writer { +// A writer that collects trace data but uses an external mechanism to transmit the data +// to the Datadog Agent. +class ExternalWriter : public Writer { public: - // Creates an AgentWriter that uses curl to send Traces to a Datadog agent. May throw a - // runtime_exception. - AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period); - - AgentWriter(std::unique_ptr handle, std::string tracer_version, - std::chrono::milliseconds write_period, size_t max_queued_traces, - std::vector retry_periods, std::string host, - uint32_t port); - - // Does not flush on destruction, buffered traces may be lost. Stops all threads. - ~AgentWriter() override; + ExternalWriter() {} + ~ExternalWriter() override {} + // Implements Writer methods. void write(Trace trace) override; - // Send all buffered Traces to the destination now. Will block until sending is complete. This - // isn't on the main Writer API because real code should not need to call this. - void flush(); - - // Permanently stops writing Traces. Calls to write() and flush() will do nothing. - void stop(); - - private: - // Initialises the curl handle. May throw a runtime_exception. - void setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port); - - // Starts asynchronously writing traces. They will be written periodically (set by write_period_) - // or when flush() is called manually. - void startWriting(std::unique_ptr handle); - // Posts the given Traces to the Agent. Returns true if it succeeds, otherwise false. - static bool postTraces(std::unique_ptr &handle, std::stringstream &buffer, - size_t num_traces); - // Retries the given function a finite number of times according to retry_periods_. Retries when - // f() returns false. - void retryFiniteOnFail(std::function f) const; - - const std::string tracer_version_; - // How often to send Traces. - const std::chrono::milliseconds write_period_; - const size_t max_queued_traces_; - // How long to wait before retrying each time. If empty, only try once. - const std::vector retry_periods_; - - // The thread on which traces are encoded and send to the agent. Receives traces on the - // traces_ queue as notified by condition_. Encodes traces to buffer_ and sends to the - // agent. - std::unique_ptr worker_ = nullptr; - // Locks access to the traces_ queue and the stop_writing_ and flush_worker_ signals. - mutable std::mutex mutex_; - // Notifies worker thread when there are new traces in the queue or it should stop. - std::condition_variable condition_; - // These two bools, stop_writing_ and flush_worker_, act as signals. They are the predicates on - // which the condition_ variable acts. - // If set to true, stops worker. Locked by mutex_; - bool stop_writing_ = false; - // If set to true, flushes worker (which sets it false again). Locked by mutex_; - bool flush_worker_ = false; - // Multiple producer (potentially), single consumer. Locked by mutex_. - std::deque traces_; + std::shared_ptr encoder() { return trace_encoder_; } }; } // namespace opentracing diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7c3db75c..357d4c4f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -14,4 +14,4 @@ _datadog_test(span_test span_test.cpp) _datadog_test(tracer_factory_test tracer_factory_test.cpp) _datadog_test(tracer_test tracer_test.cpp) _datadog_test(sample_test sample_test.cpp) -_datadog_test(writer_test writer_test.cpp) +_datadog_test(agent_writer_test agent_writer_test.cpp) diff --git a/test/writer_test.cpp b/test/agent_writer_test.cpp similarity index 87% rename from test/writer_test.cpp rename to test/agent_writer_test.cpp index 82519c0f..e7614282 100644 --- a/test/writer_test.cpp +++ b/test/agent_writer_test.cpp @@ -1,6 +1,7 @@ -#include "../src/writer.h" -#include "../src/writer.cpp" // Otherwise the compiler won't generate AgentWriter for us. +#include "../src/agent_writer.h" +#include "../src/agent_writer.cpp" // Otherwise the compiler won't generate AgentWriter for us. #include "mocks.h" +#include "version_number.h" #include @@ -26,7 +27,6 @@ TEST_CASE("writer") { size_t max_queued_traces = 25; std::vector disable_retry; AgentWriter writer{std::move(handle_ptr), - "v0.1.0", only_send_traces_when_we_flush, max_queued_traces, disable_retry, @@ -37,10 +37,6 @@ TEST_CASE("writer") { REQUIRE(handle->options == std::unordered_map{ {CURLOPT_URL, "http://hostname:6319/v0.3/traces"}, {CURLOPT_TIMEOUT_MS, "2000"}}); - REQUIRE(handle->headers == - std::map{{"Content-Type", "application/msgpack"}, - {"Datadog-Meta-Lang", "cpp"}, - {"Datadog-Meta-Tracer-Version", "v0.1.0"}}); } SECTION("traces can be sent") { @@ -69,11 +65,11 @@ TEST_CASE("writer") { {CURLOPT_URL, "http://hostname:6319/v0.3/traces"}, {CURLOPT_TIMEOUT_MS, "2000"}, {CURLOPT_POSTFIELDSIZE, "126"}}); - REQUIRE(handle->headers == - std::map{{"Content-Type", "application/msgpack"}, - {"Datadog-Meta-Lang", "cpp"}, - {"Datadog-Meta-Tracer-Version", "v0.1.0"}, - {"X-Datadog-Trace-Count", "1"}}); + REQUIRE(handle->headers == std::map{ + {"Content-Type", "application/msgpack"}, + {"Datadog-Meta-Lang", "cpp"}, + {"Datadog-Meta-Tracer-Version", config::tracer_version}, + {"X-Datadog-Trace-Count", "1"}}); } SECTION("queue does not grow indefinitely") { @@ -89,7 +85,7 @@ TEST_CASE("writer") { SECTION("bad handle causes constructor to fail") { std::unique_ptr handle_ptr{new MockHandle{}}; handle_ptr->rcode = CURLE_OPERATION_TIMEDOUT; - REQUIRE_THROWS(AgentWriter{std::move(handle_ptr), "v0.1.0", only_send_traces_when_we_flush, + REQUIRE_THROWS(AgentWriter{std::move(handle_ptr), only_send_traces_when_we_flush, max_queued_traces, disable_retry, "hostname", 6319}); } @@ -184,13 +180,8 @@ TEST_CASE("writer") { std::unique_ptr handle_ptr{new MockHandle{}}; MockHandle* handle = handle_ptr.get(); auto write_interval = std::chrono::seconds(2); - AgentWriter writer{std::move(handle_ptr), - "v0.1.0", - write_interval, - max_queued_traces, - disable_retry, - "hostname", - 6319}; + AgentWriter writer{std::move(handle_ptr), write_interval, max_queued_traces, + disable_retry, "hostname", 6319}; // Send 7 traces at 1 trace per second. Since the write period is 2s, there should be 4 // different writes. We don't count the number of writes because that could flake, but we do // check that all 7 traces are written, implicitly testing that multiple writes happen. @@ -223,7 +214,6 @@ TEST_CASE("writer") { std::vector retry_periods{std::chrono::milliseconds(500), std::chrono::milliseconds(2500)}; AgentWriter writer{std::move(handle_ptr), - "v0.1.0", only_send_traces_when_we_flush, max_queued_traces, retry_periods, @@ -262,11 +252,11 @@ TEST_CASE("writer") { writer.write(make_trace( {TestSpanData{"web", "service", "resource", "service.name", 3, 1, 1, 69, 420, 0}})); writer.flush(); - REQUIRE(handle->headers == - std::map{{"Content-Type", "application/msgpack"}, - {"Datadog-Meta-Lang", "cpp"}, - {"Datadog-Meta-Tracer-Version", "v0.1.0"}, - {"X-Datadog-Trace-Count", "3"}}); + REQUIRE(handle->headers == std::map{ + {"Content-Type", "application/msgpack"}, + {"Datadog-Meta-Lang", "cpp"}, + {"Datadog-Meta-Tracer-Version", config::tracer_version}, + {"X-Datadog-Trace-Count", "3"}}); } } } diff --git a/test/opentracing_test.cpp b/test/opentracing_test.cpp index 06071122..b275f469 100644 --- a/test/opentracing_test.cpp +++ b/test/opentracing_test.cpp @@ -1,4 +1,5 @@ #include +#include "mocks.h" #define CATCH_CONFIG_MAIN #include @@ -9,4 +10,11 @@ TEST_CASE("tracer") { auto tracer = makeTracer(TracerOptions{}); REQUIRE(tracer); } + SECTION("can be created with external Writer implementation") { + auto tp = makeTracerAndEncoder(TracerOptions{}); + auto tracer = std::get<0>(tp); + auto encoder = std::get<1>(tp); + REQUIRE(tracer); + REQUIRE(encoder); + } } diff --git a/test/tracer_factory_test.cpp b/test/tracer_factory_test.cpp index 5780e93c..33b3d148 100644 --- a/test/tracer_factory_test.cpp +++ b/test/tracer_factory_test.cpp @@ -12,7 +12,7 @@ using namespace datadog::opentracing; struct MockTracer : public ot::Tracer { TracerOptions opts; - MockTracer(TracerOptions opts_) : opts(opts_) {} + MockTracer(TracerOptions opts_, std::shared_ptr &writer) : opts(opts_) {} std::unique_ptr StartSpanWithOptions(ot::string_view operation_name, const ot::StartSpanOptions &options) const