Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
cc_library(
name = "dd_opentracing_cpp",
srcs = glob(["src/*.cpp", "src/*.h"]) + [
srcs = [
"src/clock.h",
"src/noopspan.cpp",
"src/noopspan.h",
"src/opentracing_external.cpp",
"src/propagation.cpp",
"src/propagation.h",
"src/encoder.cpp",
"src/encoder.h",
"src/sample.cpp",
"src/sample.h",
"src/span_buffer.cpp",
"src/span_buffer.h",
"src/span.cpp",
"src/span.h",
"src/tracer.cpp",
"src/tracer.h",
"src/transport.cpp",
"src/transport.h",
"src/version_check.cpp",
"src/version_check.h",
"src/writer.cpp",
"src/writer.h",
":version_number.h",
],
hdrs = [
Expand Down
28 changes: 28 additions & 0 deletions include/datadog/opentracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include <opentracing/tracer.h>

#include <deque>
#include <map>

namespace ot = opentracing;

namespace datadog {
Expand Down Expand Up @@ -32,7 +35,32 @@ struct TracerOptions {
std::string operation_name_override = "";
};

// TraceEncoder exposes the data required to encode and submit traces to the
// Datadog Agent.
class TraceEncoder {
public:
TraceEncoder() {}
virtual ~TraceEncoder() {}

// Returns the Datadog Agent endpoint that traces should be sent to.
virtual const std::string path() = 0;
virtual std::size_t pendingTraces() = 0;
virtual void clearTraces() = 0;
// Returns the HTTP headers that are required for the collection of traces.
virtual const std::map<std::string, std::string> headers() = 0;
// Returns the encoded payload from the collection of traces.
virtual const std::string payload() = 0;
};

// makeTracer returns an opentracing::Tracer that submits traces to the Datadog Agent.
// This should be used when control over the HTTP requests to the Datadog Agent is not required.
std::shared_ptr<ot::Tracer> makeTracer(const TracerOptions &options);
// makeTracerAndEncoder initializes an opentracing::Tracer and provides an encoder
// to use when submitting traces to the Datadog Agent.
// This should be used in applications that need to also control the HTTP requests to the Datadog
// Agent.
std::tuple<std::shared_ptr<ot::Tracer>, std::shared_ptr<TraceEncoder>> makeTracerAndEncoder(
const TracerOptions &options);

} // namespace opentracing
} // namespace datadog
Expand Down
173 changes: 173 additions & 0 deletions src/agent_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#include "agent_writer.h"
#include <iostream>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please undo this change, and redo it with git mv. Otherwise the history isn't kept and I can't see changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix this, but github isn't showing things clearly.
The history correctly shows up in blame, but not in the file's commit logs or in the PR diff

#include "encoder.h"
#include "version_number.h"

namespace datadog {
namespace opentracing {

namespace {
const std::string agent_protocol = "http://";
const size_t max_queued_traces = 7000;
// Retry sending traces to agent a couple of times. Any more than that and the agent won't accept
// them.
// write_period 1s + timeout 2s + (retry & timeout) 2.5s + (retry and timeout) 4.5s = 10s.
const std::vector<std::chrono::milliseconds> default_retry_periods{
std::chrono::milliseconds(500), std::chrono::milliseconds(2500)};
// Agent communication timeout.
const long default_timeout_ms = 2000L;
} // namespace

AgentWriter::AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period)
: AgentWriter(std::unique_ptr<Handle>{new CurlHandle{}}, write_period, max_queued_traces,
default_retry_periods, host, port){};

AgentWriter::AgentWriter(std::unique_ptr<Handle> handle, std::chrono::milliseconds write_period,
size_t max_queued_traces,
std::vector<std::chrono::milliseconds> retry_periods, std::string host,
uint32_t port)
: write_period_(write_period),
max_queued_traces_(max_queued_traces),
retry_periods_(retry_periods) {
setUpHandle(handle, host, port);
startWriting(std::move(handle));
}

void AgentWriter::setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port) {
// Some options are the same for all actions, set them here.
// Set the agent URI.
std::stringstream agent_uri;
agent_uri << agent_protocol << host << ":" << std::to_string(port) << trace_encoder_->path();
auto rcode = handle->setopt(CURLOPT_URL, agent_uri.str().c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent URL: ") + curl_easy_strerror(rcode));
}
rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms);
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent timeout: ") +
curl_easy_strerror(rcode));
}
} // namespace opentracing

AgentWriter::~AgentWriter() { stop(); }

void AgentWriter::stop() {
{
std::unique_lock<std::mutex> lock(mutex_);
if (stop_writing_) {
return; // Already stopped.
}
stop_writing_ = true;
}
condition_.notify_all();
worker_->join();
}

void AgentWriter::write(Trace trace) {
std::unique_lock<std::mutex> lock(mutex_);
if (stop_writing_) {
return;
}
if (trace_encoder_->pendingTraces() >= max_queued_traces_) {
return;
}
trace_encoder_->addTrace(std::move(trace));
};

void AgentWriter::startWriting(std::unique_ptr<Handle> handle) {
// Start worker that sends Traces to agent.
// We can capture 'this' because destruction of this stops the thread and the lambda.
worker_ = std::make_unique<std::thread>(
[this](std::unique_ptr<Handle> handle) {
size_t num_traces = 0;
std::map<std::string, std::string> headers;
std::string payload;
while (true) {
// Encode traces when there are new ones.
{
// Wait to be told about new traces (or to stop).
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait_for(lock, write_period_,
[&]() -> bool { return flush_worker_ || stop_writing_; });
if (stop_writing_) {
return; // Stop the thread.
}
num_traces = trace_encoder_->pendingTraces();
if (num_traces == 0) {
continue;
}
headers = trace_encoder_->headers();
payload = trace_encoder_->payload();
trace_encoder_->clearTraces();
} // lock on mutex_ ends.
// Send spans, not in critical period.
retryFiniteOnFail([&]() { return AgentWriter::postTraces(handle, headers, payload); });
// Let thread calling 'flush' that we're done flushing.
{
std::unique_lock<std::mutex> lock(mutex_);
flush_worker_ = false;
}
condition_.notify_all();
}
},
std::move(handle));
}

void AgentWriter::flush() try {
std::unique_lock<std::mutex> lock(mutex_);
flush_worker_ = true;
condition_.notify_all();
// Wait until flush is complete.
condition_.wait(lock, [&]() -> bool { return !flush_worker_ || stop_writing_; });
} catch (const std::bad_alloc &) {
}

void AgentWriter::retryFiniteOnFail(std::function<bool()> f) const {
for (std::chrono::milliseconds backoff : retry_periods_) {
if (f()) {
return;
}
{
// Just check for stop_writing_ between attempts. No need to allow wake-from-sleep
// stop_writing signal during retry period since that should always be short.
std::unique_lock<std::mutex> lock(mutex_);
if (stop_writing_) {
return;
}
}
std::this_thread::sleep_for(backoff);
}
f(); // Final try after final sleep.
}

bool AgentWriter::postTraces(std::unique_ptr<Handle> &handle,
std::map<std::string, std::string> headers, std::string payload) try {
handle->setHeaders(headers);

// We have to set the size manually, because msgpack uses null characters.
CURLcode rcode = handle->setopt(CURLOPT_POSTFIELDSIZE, payload.size());
if (rcode != CURLE_OK) {
std::cerr << "Error setting agent request size: " << curl_easy_strerror(rcode) << std::endl;
return false;
}

rcode = handle->setopt(CURLOPT_POSTFIELDS, payload.data());
if (rcode != CURLE_OK) {
std::cerr << "Error setting agent request body: " << curl_easy_strerror(rcode) << std::endl;
return false;
}

rcode = handle->perform();
if (rcode != CURLE_OK) {
std::cerr << "Error sending traces to agent: " << curl_easy_strerror(rcode) << std::endl
<< handle->getError() << std::endl;
return false;
}
return true;
} catch (const std::bad_alloc &) {
// Drop spans, but live to fight another day.
return true; // Don't attempt to retry.
}

} // namespace opentracing
} // namespace datadog
79 changes: 79 additions & 0 deletions src/agent_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#ifndef DD_OPENTRACING_AGENT_WRITER_H
#define DD_OPENTRACING_AGENT_WRITER_H

#include <curl/curl.h>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <sstream>
#include <thread>
#include "encoder.h"
#include "span.h"
#include "transport.h"

namespace datadog {
namespace opentracing {

// A Writer that sends Traces (collections of Spans) to a Datadog agent.
class AgentWriter : public Writer {
public:
// Creates an AgentWriter that uses curl to send Traces to a Datadog agent. May throw a
// runtime_exception.
AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period);

AgentWriter(std::unique_ptr<Handle> handle, std::chrono::milliseconds write_period,
size_t max_queued_traces, std::vector<std::chrono::milliseconds> retry_periods,
std::string host, uint32_t port);

// Does not flush on destruction, buffered traces may be lost. Stops all threads.
~AgentWriter() override;

void write(Trace trace) override;

// Send all buffered Traces to the destination now. Will block until sending is complete. This
// isn't on the main Writer API because real code should not need to call this.
void flush();

// Permanently stops writing Traces. Calls to write() and flush() will do nothing.
void stop();

private:
// Initialises the curl handle. May throw a runtime_exception.
void setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port);

// Starts asynchronously writing traces. They will be written periodically (set by write_period_)
// or when flush() is called manually.
void startWriting(std::unique_ptr<Handle> handle);
// Posts the given Traces to the Agent. Returns true if it succeeds, otherwise false.
static bool postTraces(std::unique_ptr<Handle> &handle,
std::map<std::string, std::string> headers, std::string payload);
// Retries the given function a finite number of times according to retry_periods_. Retries when
// f() returns false.
void retryFiniteOnFail(std::function<bool()> f) const;

// How often to send Traces.
const std::chrono::milliseconds write_period_;
const size_t max_queued_traces_;
// How long to wait before retrying each time. If empty, only try once.
const std::vector<std::chrono::milliseconds> retry_periods_;

// The thread on which traces are encoded and send to the agent. Receives traces on the
// traces_ queue as notified by condition_. Encodes traces to buffer_ and sends to the
// agent.
std::unique_ptr<std::thread> worker_ = nullptr;
// Locks access to the traces_ queue and the stop_writing_ and flush_worker_ signals.
mutable std::mutex mutex_;
// Notifies worker thread when there are new traces in the queue or it should stop.
std::condition_variable condition_;
// These two bools, stop_writing_ and flush_worker_, act as signals. They are the predicates on
// which the condition_ variable acts.
// If set to true, stops worker. Locked by mutex_;
bool stop_writing_ = false;
// If set to true, flushes worker (which sets it false again). Locked by mutex_;
bool flush_worker_ = false;
};

} // namespace opentracing
} // namespace datadog

#endif // DD_OPENTRACING_AGENT_WRITER_H
39 changes: 39 additions & 0 deletions src/encoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "encoder.h"
#include "span.h"
#include "version_number.h"

namespace datadog {
namespace opentracing {

AgentHttpEncoder::AgentHttpEncoder() {
// Set up common headers and default encoder
common_headers_ = {{"Content-Type", "application/msgpack"},
{"Datadog-Meta-Lang", "cpp"},
{"Datadog-Meta-Tracer-Version", config::tracer_version}};
}

const std::string agent_api_path = "/v0.3/traces";

const std::string AgentHttpEncoder::path() { return agent_api_path; }

void AgentHttpEncoder::clearTraces() { traces_.clear(); }

std::size_t AgentHttpEncoder::pendingTraces() { return traces_.size(); }

const std::map<std::string, std::string> AgentHttpEncoder::headers() {
std::map<std::string, std::string> headers(common_headers_);
headers["X-Datadog-Trace-Count"] = std::to_string(traces_.size());
return headers;
}

const std::string AgentHttpEncoder::payload() {
buffer_.clear();
buffer_.str(std::string{});
msgpack::pack(buffer_, traces_);
return buffer_.str();
}

void AgentHttpEncoder::addTrace(Trace trace) { traces_.push_back(std::move(trace)); }

} // namespace opentracing
} // namespace datadog
Loading