diff --git a/CMakeLists.txt b/CMakeLists.txt index 3bb07327..7d08a7ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -195,6 +195,7 @@ include(LightStepTracerCommon) include(LightStepTracerConfiguration) set(LIGHTSTEP_SRCS src/common/utility.cpp + src/common/buffer_chain.cpp src/common/fragment_input_stream.cpp src/common/fragment_array_input_stream.cpp src/common/protobuf.cpp @@ -204,13 +205,20 @@ set(LIGHTSTEP_SRCS src/common/utility.cpp src/common/random.cpp src/common/random_traverser.cpp src/common/serialization.cpp - src/common/serialization_chain.cpp + src/common/chunked_http_framing.cpp + src/common/report_request_framing.cpp + src/common/composable_fragment_input_stream.cpp + src/common/chained_stream.cpp src/common/timestamp.cpp src/recorder/report_builder.cpp src/recorder/auto_recorder.cpp src/recorder/fork_aware_recorder.cpp + src/recorder/legacy_manual_recorder.cpp src/recorder/manual_recorder.cpp src/recorder/transporter.cpp + src/recorder/serialization/report_request.cpp + src/recorder/serialization/report_request_header.cpp + src/recorder/serialization/embedded_metrics_message.cpp src/tracer/binary_carrier.cpp src/tracer/json_options.cpp src/tracer/propagation/b3_propagator.cpp @@ -266,10 +274,9 @@ if (WITH_LIBEVENT) src/recorder/stream_recorder/satellite_connection.cpp src/recorder/stream_recorder/satellite_streamer.cpp src/recorder/stream_recorder/span_stream.cpp - src/recorder/stream_recorder/stream_recorder_metrics.cpp + src/recorder/metrics_tracker.cpp src/recorder/stream_recorder/connection_stream.cpp src/recorder/stream_recorder/host_header.cpp - src/recorder/stream_recorder/embedded_metrics_message.cpp src/recorder/stream_recorder/status_line_parser.cpp src/recorder/stream_recorder/utility.cpp src/network/event.cpp diff --git a/benchmark/BUILD b/benchmark/BUILD index 73ddd02a..fd16dae1 100644 --- a/benchmark/BUILD +++ b/benchmark/BUILD @@ -14,3 +14,12 @@ lightstep_google_benchmark( ], ) +lightstep_google_benchmark( + name = "manual_tracer_benchmark", + srcs = [ + "manual_tracer_benchmark.cpp", + ], + deps = [ + "//:tracer_lib", + ], +) diff --git a/benchmark/manual_tracer_benchmark.cpp b/benchmark/manual_tracer_benchmark.cpp new file mode 100644 index 00000000..c3614000 --- /dev/null +++ b/benchmark/manual_tracer_benchmark.cpp @@ -0,0 +1,156 @@ +#include +#include + +#include "lightstep/tracer.h" + +#include "benchmark/benchmark.h" + +int MaxBufferedSpans = 500; + +//-------------------------------------------------------------------------------------------------- +// LegacyNullTransport +//-------------------------------------------------------------------------------------------------- +namespace { +class LegacyNullTransporter final : public lightstep::LegacyAsyncTransporter { + public: + // LegacyAsyncTransporter + void Send(const google::protobuf::Message& request, + google::protobuf::Message& /*response*/, + Callback& callback) override { + using google::protobuf::uint8; + auto size = request.ByteSizeLong(); + std::unique_ptr buffer{new uint8[size]}; + request.SerializeWithCachedSizesToArray(buffer.get()); + callback.OnSuccess(); + } +}; +} // namespace + +//-------------------------------------------------------------------------------------------------- +// NullTransporter +//-------------------------------------------------------------------------------------------------- +namespace { +class NullTransporter final : public lightstep::AsyncTransporter { + public: + // AsyncTransporter + void Send(std::unique_ptr&& message, + Callback& callback) noexcept override { + auto size = message->num_bytes(); + std::unique_ptr buffer{new char[size]}; + message->CopyOut(buffer.get(), size); + callback.OnSuccess(*message); + } +}; +} // namespace + +//-------------------------------------------------------------------------------------------------- +// MakeLegacyManualTracer +//-------------------------------------------------------------------------------------------------- +static std::shared_ptr MakeLegacyManualTracer() { + lightstep::LightStepTracerOptions tracer_options; + tracer_options.transporter.reset(new LegacyNullTransporter{}); + tracer_options.use_thread = false; + tracer_options.component_name = "abc"; + tracer_options.access_token = "123"; + tracer_options.max_buffered_spans = static_cast(MaxBufferedSpans); + + return lightstep::MakeLightStepTracer(std::move(tracer_options)); +} + +//-------------------------------------------------------------------------------------------------- +// MakeManualTracer +//-------------------------------------------------------------------------------------------------- +static std::shared_ptr MakeManualTracer() { + lightstep::LightStepTracerOptions tracer_options; + tracer_options.transporter.reset(new NullTransporter{}); + tracer_options.use_thread = false; + tracer_options.component_name = "abc"; + tracer_options.access_token = "123"; + tracer_options.max_buffered_spans = static_cast(MaxBufferedSpans); + + return lightstep::MakeLightStepTracer(std::move(tracer_options)); +} + +//-------------------------------------------------------------------------------------------------- +// MakeTracer +//-------------------------------------------------------------------------------------------------- +static std::shared_ptr MakeTracer( + opentracing::string_view tracer_type = "legacy_manual") { + if (tracer_type == "legacy_manual") { + return MakeLegacyManualTracer(); + } + if (tracer_type == "manual") { + return MakeManualTracer(); + } + std::cerr << "Unknown tracer type: " << tracer_type << "\n"; + std::terminate(); +} + +//-------------------------------------------------------------------------------------------------- +// BM_SmallSpanReport +//-------------------------------------------------------------------------------------------------- +static void BM_SmallSpanReport(benchmark::State& state, + const char* tracer_type) { + auto tracer = MakeTracer(tracer_type); + assert(tracer != nullptr); + for (auto _ : state) { + for (int i = 0; i < MaxBufferedSpans; ++i) { + auto span = tracer->StartSpan("abc123"); + } + tracer->Flush(); + } +} +BENCHMARK_CAPTURE(BM_SmallSpanReport, legacy_manual, "legacy_manual"); +BENCHMARK_CAPTURE(BM_SmallSpanReport, manual, "manual"); + +//-------------------------------------------------------------------------------------------------- +// BM_TaggedSpanReport +//-------------------------------------------------------------------------------------------------- +static void BM_TaggedSpanReport(benchmark::State& state, + const char* tracer_type) { + auto tracer = MakeTracer(tracer_type); + assert(tracer != nullptr); + for (auto _ : state) { + for (int i = 0; i < MaxBufferedSpans; ++i) { + auto span = tracer->StartSpan("abc123"); + char key[5]; + key[0] = 'a'; + key[1] = 'b'; + key[2] = 'c'; + key[3] = '0'; + key[4] = '\0'; + for (int i = 0; i < 10; ++i) { + span->SetTag(opentracing::string_view{key, 4}, "123"); + ++key[3]; + } + } + tracer->Flush(); + } +} +BENCHMARK_CAPTURE(BM_TaggedSpanReport, legacy_manual, "legacy_manual"); +BENCHMARK_CAPTURE(BM_TaggedSpanReport, manual, "manual"); + +//-------------------------------------------------------------------------------------------------- +// BM_LoggedSpanReport +//-------------------------------------------------------------------------------------------------- +static void BM_LoggedSpanReport(benchmark::State& state, + const char* tracer_type) { + auto tracer = MakeTracer(tracer_type); + assert(tracer != nullptr); + for (auto _ : state) { + for (int i = 0; i < MaxBufferedSpans; ++i) { + auto span = tracer->StartSpan("abc123"); + for (int i = 0; i < 10; ++i) { + span->Log({{"abc", 123}}); + } + } + tracer->Flush(); + } +} +BENCHMARK_CAPTURE(BM_LoggedSpanReport, legacy_manual, "legacy_manual"); +BENCHMARK_CAPTURE(BM_LoggedSpanReport, manual, "manual"); + +//-------------------------------------------------------------------------------------------------- +// BENCHMARK_MAIN +//-------------------------------------------------------------------------------------------------- +BENCHMARK_MAIN(); diff --git a/include/lightstep/BUILD b/include/lightstep/BUILD index 80a98eaf..61873891 100644 --- a/include/lightstep/BUILD +++ b/include/lightstep/BUILD @@ -19,12 +19,20 @@ lightstep_cc_library( ], ) +lightstep_cc_library( + name = "buffer_chain_interface", + hdrs = [ + "buffer_chain.h", + ], +) + lightstep_cc_library( name = "transporter_interface", hdrs = [ "transporter.h", ], external_deps = [ + ":buffer_chain_interface", "@com_google_protobuf//:protobuf", "@io_opentracing_cpp//:opentracing", ], diff --git a/include/lightstep/buffer_chain.h b/include/lightstep/buffer_chain.h new file mode 100644 index 00000000..4d12530e --- /dev/null +++ b/include/lightstep/buffer_chain.h @@ -0,0 +1,42 @@ +#pragma once + +#include + +namespace lightstep { +/** + * BufferChain provides an interface to access a chained sequence of + * contiguous memory buffers. + */ +class BufferChain { + public: + using FragmentCallback = bool (*)(void*, const void*, size_t); + + virtual ~BufferChain() = default; + + /** + * @return the number of fragments in chain + */ + virtual size_t num_fragments() const noexcept = 0; + + /** + * @return the total number of bytes in all fragments + */ + virtual size_t num_bytes() const noexcept = 0; + + /** + * Iterate over each fragment in the buffer chain + * @param callback the callback to call for each fragment + * @param context a pointer to pass to the callback. + */ + virtual bool ForEachFragment(FragmentCallback callback, + void* context) const = 0; + + /** + * Copy out all the fragments into an area of contiguous memory + * @param data the location to start copying + * @param length the size of the buffer destination. Length must be at least + * num_bytes. + */ + void CopyOut(char* data, size_t length) const noexcept; +}; +} // namespace lightstep diff --git a/include/lightstep/tracer.h b/include/lightstep/tracer.h index b77070b0..a78481bf 100644 --- a/include/lightstep/tracer.h +++ b/include/lightstep/tracer.h @@ -144,7 +144,7 @@ struct LightStepTracerOptions { // default transporter is used. // // If `use_thread` is false, `transporter` should be derived from - // AsyncTransporter; otherwise, it must be derived from SyncTransporter. + // LegacyAsyncTransporter; otherwise, it must be derived from SyncTransporter. std::unique_ptr transporter; // `metrics_observer` can be optionally provided to track LightStep tracer diff --git a/include/lightstep/transporter.h b/include/lightstep/transporter.h index 17221870..97a859e8 100644 --- a/include/lightstep/transporter.h +++ b/include/lightstep/transporter.h @@ -1,12 +1,13 @@ #pragma once #include +#include #include #include namespace lightstep { // Transporter is the abstract base class for SyncTransporter and -// AsyncTransporter. +// LegacyAsyncTransporter. class Transporter { public: Transporter() noexcept = default; @@ -36,8 +37,10 @@ class SyncTransporter : public Transporter { google::protobuf::Message& response) = 0; }; -// AsyncTransporter customizes how asynchronous tracing reports are sent. -class AsyncTransporter : public Transporter { +// LegacyAsyncTransporter customizes how asynchronous tracing reports are sent. +// +// Deprecated: Use AsyncTransporter +class LegacyAsyncTransporter : public Transporter { public: // Callback interface used by Send. class Callback { @@ -66,4 +69,52 @@ class AsyncTransporter : public Transporter { google::protobuf::Message& response, Callback& callback) = 0; }; + +/** + * Provides a hook to customize how ReportRequests are transported. + */ +class AsyncTransporter : public Transporter { + public: + /** + * A Callback to be invoked after transporting a ReportRequest + */ + class Callback { + public: + Callback() noexcept = default; + Callback(const Callback&) noexcept = default; + Callback(Callback&&) noexcept = default; + + virtual ~Callback() = default; + + Callback& operator=(const Callback&) noexcept = default; + Callback& operator=(Callback&&) noexcept = default; + + /** + * Call when a ReportRequest was successfully transported. + * @param message the ReportRequest + */ + virtual void OnSuccess(BufferChain& message) noexcept = 0; + + /** + * Call when a ReportRequest not successfully transported. + * @param message the ReportRequest + */ + virtual void OnFailure(BufferChain& message) noexcept = 0; + }; + + /** + * Called when the tracer's span buffer is full. Implementors can either flush + * the pending spans early (so that we don't drop anything) or do nothing and + * let subsequent finished spans drop. + */ + virtual void OnSpanBufferFull() noexcept {} + + /** + * Send a ReportRequest + * @param message a BufferChain representing the ReportRequest's serialization + * @param callback a Callback to be called after message is transported + */ + virtual void Send(std::unique_ptr&& message, + Callback& callback) noexcept = 0; +}; } // namespace lightstep diff --git a/src/common/BUILD b/src/common/BUILD index d32bd70a..3ea5df24 100644 --- a/src/common/BUILD +++ b/src/common/BUILD @@ -13,6 +13,16 @@ lightstep_cc_library( ], ) +lightstep_cc_library( + name = "buffer_chain_lib", + srcs = [ + "buffer_chain.cpp", + ], + deps = [ + "//include/lightstep:buffer_chain_interface", + ], +) + lightstep_cc_library( name = "spin_lock_mutex_lib", private_hdrs = [ @@ -31,18 +41,17 @@ lightstep_cc_library( ) lightstep_cc_library( - name = "serialization_chain_lib", + name = "chained_stream_lib", private_hdrs = [ - "serialization_chain.h", + "chained_stream.h", ], srcs = [ - "serialization_chain.cpp", + "chained_stream.cpp", ], deps = [ ":fragment_input_stream_lib", ":noncopyable_lib", - ":hex_conversion_lib", - ":serialization_lib", + ":composable_fragment_input_stream_lib", ], external_deps = [ "@com_google_protobuf//:protobuf", @@ -216,6 +225,19 @@ lightstep_cc_library( ], ) +lightstep_cc_library( + name = "composable_fragment_input_stream_lib", + private_hdrs = [ + "composable_fragment_input_stream.h", + ], + srcs = [ + "composable_fragment_input_stream.cpp", + ], + deps = [ + ":fragment_input_stream_lib", + ], +) + lightstep_cc_library( name = "fragment_array_input_stream_lib", private_hdrs = [ @@ -247,6 +269,36 @@ lightstep_cc_library( ], ) +lightstep_cc_library( + name = "report_request_framing_lib", + private_hdrs = [ + "report_request_framing.h", + ], + srcs = [ + "report_request_framing.cpp", + ], + deps = [ + ":serialization_lib", + ], +) + +lightstep_cc_library( + name = "chunked_http_framing_lib", + private_hdrs = [ + "chunked_http_framing.h", + ], + srcs = [ + "chunked_http_framing.cpp", + ], + deps = [ + ":serialization_lib", + ":hex_conversion_lib", + ], + external_deps = [ + "@io_opentracing_cpp//:opentracing", + ], +) + lightstep_cc_library( name = "timestamp_lib", private_hdrs = [ diff --git a/src/common/buffer_chain.cpp b/src/common/buffer_chain.cpp new file mode 100644 index 00000000..f2d5514c --- /dev/null +++ b/src/common/buffer_chain.cpp @@ -0,0 +1,23 @@ +#include "lightstep/buffer_chain.h" + +#include +#include + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// CopyOut +//-------------------------------------------------------------------------------------------------- +void BufferChain::CopyOut(char* data, size_t length) const noexcept { + if (length < this->num_bytes()) { + std::terminate(); + } + auto callback = [](void* context, const void* fragment_data, + size_t fragment_size) noexcept { + auto out = static_cast(context); + *out = std::copy_n(static_cast(fragment_data), fragment_size, + *out); + return true; + }; + this->ForEachFragment(callback, static_cast(&data)); +} +} // namespace lightstep diff --git a/src/common/chained_stream.cpp b/src/common/chained_stream.cpp new file mode 100644 index 00000000..4a7fb928 --- /dev/null +++ b/src/common/chained_stream.cpp @@ -0,0 +1,132 @@ +#include "common/chained_stream.h" + +#include + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// constructor +//-------------------------------------------------------------------------------------------------- +ChainedStream::ChainedStream() noexcept : current_block_{&head_} {} + +//-------------------------------------------------------------------------------------------------- +// CloseOutput +//-------------------------------------------------------------------------------------------------- +void ChainedStream::CloseOutput() noexcept { + output_closed_ = true; + current_block_ = &head_; +} + +//-------------------------------------------------------------------------------------------------- +// Next +//-------------------------------------------------------------------------------------------------- +bool ChainedStream::Next(void** data, int* size) { + assert(!output_closed_); + + if (current_block_position_ < BlockSize) { + *size = BlockSize - current_block_position_; + *data = static_cast(current_block_->data.data() + + current_block_position_); + num_bytes_written_ += *size; + current_block_position_ = BlockSize; + current_block_->size = BlockSize; + return true; + } + current_block_->next.reset(new Block{}); + current_block_ = current_block_->next.get(); + current_block_->size = BlockSize; + current_block_position_ = BlockSize; + *size = BlockSize; + num_bytes_written_ += *size; + *data = static_cast(current_block_->data.data()); + ++num_blocks_; + return true; +} + +//-------------------------------------------------------------------------------------------------- +// BackUp +//-------------------------------------------------------------------------------------------------- +void ChainedStream::BackUp(int count) { + assert(!output_closed_); + + num_bytes_written_ -= count; + current_block_position_ -= count; + current_block_->size -= count; +} + +//-------------------------------------------------------------------------------------------------- +// segment_num_fragments +//-------------------------------------------------------------------------------------------------- +int ChainedStream::segment_num_fragments() const noexcept { + assert(output_closed_); + + if (num_bytes_written_ == 0) { + return 0; + } + return num_blocks_ - fragment_index_; +} + +//-------------------------------------------------------------------------------------------------- +// SegmentForEachFragment +//-------------------------------------------------------------------------------------------------- +bool ChainedStream::SegmentForEachFragment(Callback callback) const noexcept { + assert(output_closed_); + assert(fragment_index_ >= 0 && fragment_index_ <= num_blocks_); + + if (num_blocks_ == 0) { + return true; + } + + auto block = current_block_; + + auto block_size = block->size; + assert(block_size >= fragment_position_); + if (!callback(static_cast(const_cast(block->data.data()) + + fragment_position_), + block_size - fragment_position_)) { + return false; + } + + block = block->next.get(); + while (block != nullptr) { + if (!callback(static_cast(const_cast(block->data.data())), + block->size)) { + return false; + } + block = block->next.get(); + } + return true; +} + +//-------------------------------------------------------------------------------------------------- +// SegmentClear +//-------------------------------------------------------------------------------------------------- +void ChainedStream::SegmentClear() noexcept { + assert(output_closed_); + + num_blocks_ = 0; + num_bytes_written_ = 0; + fragment_index_ = 0; + fragment_position_ = 0; + current_block_ = nullptr; +} + +//-------------------------------------------------------------------------------------------------- +// SegmentSeek +//-------------------------------------------------------------------------------------------------- +void ChainedStream::SegmentSeek(int relative_fragment_index, + int position) noexcept { + assert(output_closed_); + assert(fragment_index_ + relative_fragment_index <= num_blocks_); + if (relative_fragment_index == 0) { + fragment_position_ += position; + assert(fragment_position_ <= current_block_->size); + return; + } + for (int i = 0; i < relative_fragment_index; ++i) { + current_block_ = current_block_->next.get(); + } + fragment_index_ += relative_fragment_index; + fragment_position_ = position; + assert(fragment_position_ <= current_block_->size); +} +} // namespace lightstep diff --git a/src/common/chained_stream.h b/src/common/chained_stream.h new file mode 100644 index 00000000..5e921c0d --- /dev/null +++ b/src/common/chained_stream.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include +#include + +#include "common/composable_fragment_input_stream.h" +#include "common/function_ref.h" +#include "common/noncopyable.h" + +#include + +namespace lightstep { +/** + * Maintains a linked chain of blocks as they are written + */ +class ChainedStream final : public google::protobuf::io::ZeroCopyOutputStream, + public ComposableFragmentInputStream, + private Noncopyable { + public: + static const int BlockSize = 256; + + ChainedStream() noexcept; + + /** + * Close the stream for output. After calling this, we can no longer write to + * the stream, but we can interact with it as a FragmentInputStream. + */ + void CloseOutput() noexcept; + + // ZeroCopyOutputStream + bool Next(void** data, int* size) override; + + void BackUp(int count) override; + + google::protobuf::int64 ByteCount() const override { + return static_cast(num_bytes_written_); + } + + // FragmentInputStream + int segment_num_fragments() const noexcept override; + + bool SegmentForEachFragment(Callback callback) const noexcept override; + + void SegmentClear() noexcept override; + + void SegmentSeek(int relative_fragment_index, int position) noexcept override; + + private: + struct Block { + std::unique_ptr next; + int size; + std::array data; + }; + + bool output_closed_{false}; + + int num_blocks_{1}; + int num_bytes_written_{0}; + int current_block_position_{0}; + Block* current_block_; + + int fragment_index_{0}; + int fragment_position_{0}; + + Block head_; +}; +} // namespace lightstep diff --git a/src/common/chunked_http_framing.cpp b/src/common/chunked_http_framing.cpp new file mode 100644 index 00000000..dfc89407 --- /dev/null +++ b/src/common/chunked_http_framing.cpp @@ -0,0 +1,19 @@ +#include "common/chunked_http_framing.h" + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// WriteHttpChunkHeader +//-------------------------------------------------------------------------------------------------- +size_t WriteHttpChunkHeader(char* data, size_t size, + uint32_t chunk_size) noexcept { + assert(size >= ChunkedHttpMaxHeaderSize); + auto serialization_start = data + (size - ChunkedHttpMaxHeaderSize); + auto chunk_size_str = + Uint32ToHex(static_cast(chunk_size), serialization_start); + assert(chunk_size_str.size() == Num32BitHexDigits); + auto iter = serialization_start + chunk_size_str.size(); + *iter++ = '\r'; + *iter++ = '\n'; + return ChunkedHttpMaxHeaderSize; +} +} // namespace lightstep diff --git a/src/common/chunked_http_framing.h b/src/common/chunked_http_framing.h new file mode 100644 index 00000000..24615d19 --- /dev/null +++ b/src/common/chunked_http_framing.h @@ -0,0 +1,22 @@ +#pragma once + +#include "common/hex_conversion.h" + +#include + +namespace lightstep { +const size_t ChunkedHttpMaxHeaderSize = Num32BitHexDigits + 2; + +const opentracing::string_view ChunkedHttpFooter = "\r\n"; + +/** + * Write the header part of framing for an http/1.1 chunk. + * @param data start of the buffer to write to. + * @param size the size of the buffer + * @param chunk_size the size of the chunk. + * + * Note: Data is written from the end of the provided buffer. + */ +size_t WriteHttpChunkHeader(char* data, size_t size, + uint32_t chunk_size) noexcept; +} // namespace lightstep diff --git a/src/common/composable_fragment_input_stream.cpp b/src/common/composable_fragment_input_stream.cpp new file mode 100644 index 00000000..91c11cca --- /dev/null +++ b/src/common/composable_fragment_input_stream.cpp @@ -0,0 +1,78 @@ +#include "common/composable_fragment_input_stream.h" + +#include + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// Append +//-------------------------------------------------------------------------------------------------- +void ComposableFragmentInputStream::Append( + std::unique_ptr&& stream) noexcept { + if (next_ == nullptr) { + next_ = std::move(stream); + last_ = next_.get(); + return; + } + assert(last_ != nullptr); + auto prev_last = last_; + last_ = stream.get(); + prev_last->Append(std::move(stream)); +} + +//-------------------------------------------------------------------------------------------------- +// num_fragments +//-------------------------------------------------------------------------------------------------- +int ComposableFragmentInputStream::num_fragments() const noexcept { + auto stream = this; + int result = 0; + while (stream != nullptr) { + result += stream->segment_num_fragments(); + stream = stream->next_.get(); + } + return result; +} + +//-------------------------------------------------------------------------------------------------- +// ForEachFragment +//-------------------------------------------------------------------------------------------------- +bool ComposableFragmentInputStream::ForEachFragment(Callback callback) const + noexcept { + auto stream = this; + while (stream != nullptr) { + if (!stream->SegmentForEachFragment(callback)) { + return false; + } + stream = stream->next_.get(); + } + return true; +} + +//-------------------------------------------------------------------------------------------------- +// Clear +//-------------------------------------------------------------------------------------------------- +void ComposableFragmentInputStream::Clear() noexcept { + auto stream = this; + while (stream != nullptr) { + stream->SegmentClear(); + stream = stream->next_.get(); + } +} + +//-------------------------------------------------------------------------------------------------- +// Seek +//-------------------------------------------------------------------------------------------------- +void ComposableFragmentInputStream::Seek(int fragment_index, + int position) noexcept { + auto stream = this; + while (stream != nullptr) { + auto segment_num_fragments = stream->segment_num_fragments(); + if (fragment_index < segment_num_fragments) { + return stream->SegmentSeek(fragment_index, position); + } + fragment_index -= segment_num_fragments; + stream->SegmentClear(); + stream = stream->next_.get(); + } + assert(fragment_index == 0 && position == 0); +} +} // namespace lightstep diff --git a/src/common/composable_fragment_input_stream.h b/src/common/composable_fragment_input_stream.h new file mode 100644 index 00000000..99b7a10c --- /dev/null +++ b/src/common/composable_fragment_input_stream.h @@ -0,0 +1,59 @@ +#pragma once + +#include + +#include "common/fragment_input_stream.h" + +namespace lightstep { +/** + * Maintains an intrusive linked list of ComposableFragmentInputStreams. + */ +class ComposableFragmentInputStream : public FragmentInputStream { + public: + /** + * Append a ComposableFragmentInputStream. + * @param stream the ComposableFragmentInputStream to append + */ + void Append(std::unique_ptr&& stream) noexcept; + + /** + * The number of fragments in this node of the linked + * ComposableFragmentInputStreams. + */ + virtual int segment_num_fragments() const noexcept = 0; + + /** + * Iterate over the fragments in this node of the linked + * ComposableFragmentInputStreams. + * @param callback the callback to call for each fragment + */ + virtual bool SegmentForEachFragment(Callback callback) const noexcept = 0; + + /** + * Clear this node of the linked ComposableFragmentInputStreams. + */ + virtual void SegmentClear() noexcept = 0; + + /** + * Seek forward in this node of the linked ComposableFragmentInputStreams. + * @param fragment_index the new fragment index to reposition to relative to + * the current fragment index. + * @param position the position within fragment_index to reposition to + * relative to the current position. + */ + virtual void SegmentSeek(int fragment_index, int position) noexcept = 0; + + // FragmentInputStream + int num_fragments() const noexcept final; + + bool ForEachFragment(Callback callback) const noexcept final; + + void Clear() noexcept final; + + void Seek(int fragment_index, int position) noexcept final; + + private: + std::unique_ptr next_; + ComposableFragmentInputStream* last_{nullptr}; +}; +} // namespace lightstep diff --git a/src/common/report_request_framing.cpp b/src/common/report_request_framing.cpp new file mode 100644 index 00000000..05dacb1c --- /dev/null +++ b/src/common/report_request_framing.cpp @@ -0,0 +1,21 @@ +#include "common/report_request_framing.h" + +#include + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// WriteReportRequestSpansHeader +//-------------------------------------------------------------------------------------------------- +size_t WriteReportRequestSpansHeader(char* data, size_t size, + uint32_t body_size) noexcept { + assert(size >= ReportRequestSpansMaxHeaderSize); + auto header_size = + ComputeLengthDelimitedHeaderSerializationSize( + body_size); + auto serialization_start = data + (size - header_size); + DirectCodedOutputStream stream{ + reinterpret_cast(serialization_start)}; + WriteKeyLength(stream, body_size); + return header_size; +} +} // namespace lightstep diff --git a/src/common/report_request_framing.h b/src/common/report_request_framing.h new file mode 100644 index 00000000..7ae05951 --- /dev/null +++ b/src/common/report_request_framing.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +#include "common/serialization.h" + +namespace lightstep { +constexpr size_t ReportRequestSpansField = 3; + +constexpr size_t ReportRequestSpansMaxHeaderSize = + StaticKeySerializationSize::value + + google::protobuf::io::CodedOutputStream::StaticVarintSize32< + std::numeric_limits::max()>::value; + +/** + * Write framing for a span in a collector::ReportRequest message. + * @param data start of the buffer to write to. + * @param size the size of the buffer + * @param chunk_size the size of the chunk. + * + * Note: Data is written from the end of the provided buffer. + */ +size_t WriteReportRequestSpansHeader(char* data, size_t size, + uint32_t body_size) noexcept; +} // namespace lightstep diff --git a/src/common/serialization.h b/src/common/serialization.h index 818649ae..f2d068b7 100644 --- a/src/common/serialization.h +++ b/src/common/serialization.h @@ -20,7 +20,7 @@ template struct StaticSerializationKey { // See https://developers.google.com/protocol-buffers/docs/encoding#structure // for documentation on encoding. - static const uint32_t value = static_cast( + static constexpr uint32_t value = static_cast( (FieldNumber << 3) | static_cast(WireTypeValue)); }; @@ -29,7 +29,7 @@ struct StaticSerializationKey { */ template struct StaticKeySerializationSize { - static const size_t value = + static constexpr size_t value = google::protobuf::io::CodedOutputStream::StaticVarintSize32< StaticSerializationKey::value>::value; }; diff --git a/src/common/serialization_chain.cpp b/src/common/serialization_chain.cpp deleted file mode 100644 index ecc1898a..00000000 --- a/src/common/serialization_chain.cpp +++ /dev/null @@ -1,183 +0,0 @@ -#include "common/serialization_chain.h" - -#include -#include - -#include - -namespace lightstep { -static const char* LineTerminator = "\r\n"; - -//-------------------------------------------------------------------------------------------------- -// WriteChunkHeader -//-------------------------------------------------------------------------------------------------- -static int WriteChunkHeader(char* data, size_t data_length, - size_t chunk_size) noexcept { - (void)data_length; - auto chunk_size_str = Uint32ToHex(static_cast(chunk_size), data); - assert(chunk_size_str.size() + 2 <= data_length); - assert(!chunk_size_str.empty()); - auto iter = data + chunk_size_str.size(); - *iter++ = '\r'; - *iter++ = '\n'; - return static_cast(std::distance(data, iter)); -} - -//-------------------------------------------------------------------------------------------------- -// constructor -//-------------------------------------------------------------------------------------------------- -SerializationChain::SerializationChain() noexcept : current_block_{&head_} {} - -//-------------------------------------------------------------------------------------------------- -// AddFraming -//-------------------------------------------------------------------------------------------------- -void SerializationChain::AddFraming() noexcept { - auto protobuf_header_size = - ComputeLengthDelimitedHeaderSerializationSize( - num_bytes_written_); - (void)WriteChunkHeader; - header_size_ = WriteChunkHeader( - header_.data(), header_.size(), - static_cast(num_bytes_written_) + protobuf_header_size); - assert(header_size_ > 0); - - // Serialize the spans key field and length - DirectCodedOutputStream stream{reinterpret_cast( - header_.data() + header_size_)}; - WriteKeyLength( - stream, static_cast(num_bytes_written_)); - header_size_ += protobuf_header_size; - - // Prepare the data structure to act as a FragmentInputStream. - current_block_ = &head_; -} - -//-------------------------------------------------------------------------------------------------- -// Next -//-------------------------------------------------------------------------------------------------- -bool SerializationChain::Next(void** data, int* size) { - if (current_block_position_ < BlockSize) { - *size = BlockSize - current_block_position_; - *data = static_cast(current_block_->data.data() + - current_block_position_); - num_bytes_written_ += *size; - current_block_position_ = BlockSize; - current_block_->size = BlockSize; - return true; - } - current_block_->next.reset(new Block{}); - current_block_ = current_block_->next.get(); - current_block_->size = BlockSize; - current_block_position_ = BlockSize; - *size = BlockSize; - num_bytes_written_ += *size; - *data = static_cast(current_block_->data.data()); - ++num_blocks_; - return true; -} - -//-------------------------------------------------------------------------------------------------- -// BackUp -//-------------------------------------------------------------------------------------------------- -void SerializationChain::BackUp(int count) { - num_bytes_written_ -= count; - current_block_position_ -= count; - current_block_->size -= count; -} - -//-------------------------------------------------------------------------------------------------- -// num_fragments -//-------------------------------------------------------------------------------------------------- -int SerializationChain::num_fragments() const noexcept { - if (num_bytes_written_ == 0) { - return 0; - } - return num_blocks_ + 2 - fragment_index_; -} - -//-------------------------------------------------------------------------------------------------- -// ForEachFragment -//-------------------------------------------------------------------------------------------------- -bool SerializationChain::ForEachFragment(Callback callback) const noexcept { - assert(fragment_index_ >= 0 && fragment_index_ <= num_blocks_ + 1); - - if (num_blocks_ == 0) { - return true; - } - - // header - if (fragment_index_ == 0) { - assert(fragment_position_ < header_size_); - if (!callback(static_cast(const_cast(header_.data()) + - fragment_position_), - header_size_ - fragment_position_)) { - return false; - } - } - - // data - auto block = current_block_; - if (fragment_index_ > 0 && fragment_index_ <= num_blocks_) { - auto block_size = block->size; - assert(block_size >= fragment_position_); - if (!callback(static_cast(const_cast(block->data.data()) + - fragment_position_), - block_size - fragment_position_)) { - return false; - } - block = block->next.get(); - } - while (block != nullptr) { - if (!callback(static_cast(const_cast(block->data.data())), - block->size)) { - return false; - } - block = block->next.get(); - } - - // chunk trailer - if (fragment_index_ == num_blocks_ + 1) { - assert(fragment_position_ < 2); - return callback(static_cast(const_cast(LineTerminator) + - fragment_position_), - 2 - fragment_position_); - } - return callback(static_cast(const_cast(LineTerminator)), 2); -} - -//-------------------------------------------------------------------------------------------------- -// Clear -//-------------------------------------------------------------------------------------------------- -void SerializationChain::Clear() noexcept { - num_blocks_ = 0; - num_bytes_written_ = 0; - fragment_index_ = 0; - fragment_position_ = 0; - current_block_ = nullptr; -} - -//-------------------------------------------------------------------------------------------------- -// Seek -//-------------------------------------------------------------------------------------------------- -void SerializationChain::Seek(int fragment_index, int position) noexcept { - if (fragment_index == 0) { - fragment_position_ += position; - return; - } - auto prev_fragment_index = fragment_index_; - fragment_index_ += fragment_index; - if (fragment_index_ == num_blocks_ + 1) { - assert(position < 2); - fragment_position_ = position; - current_block_ = nullptr; - return; - } - - int prev_block_index = std::max(prev_fragment_index - 1, 0); - int block_index = fragment_index_ - 1; - for (int i = prev_block_index; i < block_index; ++i) { - current_block_ = current_block_->next.get(); - } - fragment_position_ = position; -} -} // namespace lightstep diff --git a/src/common/serialization_chain.h b/src/common/serialization_chain.h deleted file mode 100644 index bf1cbff0..00000000 --- a/src/common/serialization_chain.h +++ /dev/null @@ -1,77 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "common/fragment_input_stream.h" -#include "common/hex_conversion.h" -#include "common/noncopyable.h" -#include "common/serialization.h" - -#include - -namespace lightstep { -/** - * Maintains a linked chain of blocks for a serialization. - */ -class SerializationChain final - : public google::protobuf::io::ZeroCopyOutputStream, - public FragmentInputStream, - private Noncopyable { - public: - static const int BlockSize = 256; - static const size_t ReportRequestSpansField = 3; - - SerializationChain() noexcept; - - /** - * Adds http/1.1 chunk framing and a message header so that the data can be - * parsed as part of a protobuf ReportRequest. - */ - void AddFraming() noexcept; - - // ZeroCopyOutputStream - bool Next(void** data, int* size) override; - - void BackUp(int count) override; - - google::protobuf::int64 ByteCount() const override { - return static_cast(num_bytes_written_); - } - - // FragmentInputStream - int num_fragments() const noexcept override; - - bool ForEachFragment(Callback callback) const noexcept override; - - void Clear() noexcept override; - - void Seek(int fragment_index, int position) noexcept override; - - private: - struct Block { - std::unique_ptr next; - int size; - std::array data; - }; - - int num_blocks_{1}; - int num_bytes_written_{0}; - int current_block_position_{0}; - int header_size_{0}; - Block* current_block_; - - int fragment_index_{0}; - int fragment_position_{0}; - - Block head_; - static const size_t MaxHeaderSize = - Num64BitHexDigits + 2 + - StaticKeySerializationSize::value + - google::protobuf::io::CodedOutputStream::StaticVarintSize32< - std::numeric_limits::max()>::value; - std::array header_; -}; -} // namespace lightstep diff --git a/src/recorder/BUILD b/src/recorder/BUILD index 2a738230..8e9a4531 100644 --- a/src/recorder/BUILD +++ b/src/recorder/BUILD @@ -35,11 +35,14 @@ lightstep_cc_library( "recorder.h", ], deps = [ - "//src/common:serialization_chain_lib", + "//src/common:chained_stream_lib", "//src/common:timestamp_lib", "//lightstep-tracer-common:collector_proto_cc", "//include/lightstep:tracer_interface", ], + external_deps = [ + "@com_google_protobuf//:protobuf", + ], ) lightstep_cc_library( @@ -72,6 +75,24 @@ lightstep_cc_library( ], ) +lightstep_cc_library( + name = "legacy_manual_recorder_lib", + private_hdrs = [ + "legacy_manual_recorder.h", + ], + srcs = [ + "legacy_manual_recorder.cpp", + ], + deps = [ + "//src/common:logger_lib", + "//src/common:noncopyable_lib", + "//src/common:utility_lib", + ":recorder_interface", + ":report_builder_lib", + ":transporter_lib", + ], +) + lightstep_cc_library( name = "manual_recorder_lib", private_hdrs = [ @@ -83,9 +104,14 @@ lightstep_cc_library( deps = [ "//src/common:logger_lib", "//src/common:noncopyable_lib", + "//src/common:random_lib", + "//src/common:report_request_framing_lib", "//src/common:utility_lib", - ":recorder_interface", - ":report_builder_lib", + "//src/common:circular_buffer_lib", + "//src/recorder:metrics_tracker_lib", + "//src/recorder/serialization:report_request_lib", + "//src/recorder/serialization:report_request_header_lib", + ":fork_aware_recorder_lib", ":transporter_lib", ], ) @@ -141,3 +167,16 @@ lightstep_cc_library( ":stream_recorder_interface", ], ) + +lightstep_cc_library( + name = "metrics_tracker_lib", + private_hdrs = [ + "metrics_tracker.h", + ], + srcs = [ + "metrics_tracker.cpp", + ], + deps = [ + "//include/lightstep:tracer_interface", + ], +) diff --git a/src/recorder/legacy_manual_recorder.cpp b/src/recorder/legacy_manual_recorder.cpp new file mode 100644 index 00000000..be696b4f --- /dev/null +++ b/src/recorder/legacy_manual_recorder.cpp @@ -0,0 +1,131 @@ +#include "recorder/legacy_manual_recorder.h" +#include "common/utility.h" + +namespace lightstep { +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ +LegacyManualRecorder::LegacyManualRecorder( + Logger& logger, LightStepTracerOptions options, + std::unique_ptr&& transporter) + : logger_{logger}, + options_{std::move(options)}, + builder_{options_.access_token, options_.tags}, + transporter_{std::move(transporter)} { + // If no MetricsObserver was provided, use a default one that does nothing. + if (options_.metrics_observer == nullptr) { + options_.metrics_observer.reset(new MetricsObserver{}); + } +} + +//------------------------------------------------------------------------------ +// RecordSpan +//------------------------------------------------------------------------------ +void LegacyManualRecorder::RecordSpan( + const collector::Span& span) noexcept try { + if (disabled_) { + dropped_spans_++; + options_.metrics_observer->OnSpansDropped(1); + return; + } + + auto max_buffered_spans = options_.max_buffered_spans.value(); + if (builder_.num_pending_spans() >= max_buffered_spans) { + // If there's no report in flight, flush the recoder. We can only get + // here if max_buffered_spans was dynamically decreased. + // + // Otherwise, drop the span. + if (!IsReportInProgress()) { + FlushOne(); + } else { + dropped_spans_++; + options_.metrics_observer->OnSpansDropped(1); + return; + } + } + builder_.AddSpan(span); + if (builder_.num_pending_spans() >= max_buffered_spans) { + FlushOne(); + } +} catch (const std::exception& e) { + logger_.Error("Failed to record span: ", e.what()); +} + +//------------------------------------------------------------------------------ +// IsReportInProgress +//------------------------------------------------------------------------------ +bool LegacyManualRecorder::IsReportInProgress() const noexcept { + return encoding_seqno_ > 1 + flushed_seqno_; +} + +//------------------------------------------------------------------------------ +// FlushOne +//------------------------------------------------------------------------------ +bool LegacyManualRecorder::FlushOne() noexcept try { + options_.metrics_observer->OnFlush(); + + // If a report is currently in flight, do nothing; and if there are any + // pending spans, then the flush is considered to have failed. + if (IsReportInProgress()) { + return builder_.num_pending_spans() == 0; + } + + saved_pending_spans_ = builder_.num_pending_spans(); + if (saved_pending_spans_ == 0) { + return true; + } + options_.metrics_observer->OnSpansSent( + static_cast(saved_pending_spans_)); + saved_dropped_spans_ = dropped_spans_; + builder_.set_pending_client_dropped_spans(dropped_spans_); + dropped_spans_ = 0; + std::swap(builder_.pending(), active_request_); + ++encoding_seqno_; + transporter_->Send(active_request_, active_response_, *this); + return true; +} catch (const std::exception& e) { + logger_.Error("Failed to Flush: ", e.what()); + options_.metrics_observer->OnSpansDropped(saved_pending_spans_); + dropped_spans_ += saved_pending_spans_; + active_request_.Clear(); + return false; +} + +//------------------------------------------------------------------------------ +// FlushWithTimeout +//------------------------------------------------------------------------------ +bool LegacyManualRecorder::FlushWithTimeout( + std::chrono::system_clock::duration /*timeout*/) noexcept { + if (disabled_) { + return false; + } + return FlushOne(); +} + +//------------------------------------------------------------------------------ +// OnSuccess +//------------------------------------------------------------------------------ +void LegacyManualRecorder::OnSuccess() noexcept { + ++flushed_seqno_; + active_request_.Clear(); + LogReportResponse(logger_, options_.verbose, active_response_); + for (auto& command : active_response_.commands()) { + if (command.disable()) { + logger_.Warn("Tracer disabled by collector"); + disabled_ = true; + } + } +} + +//------------------------------------------------------------------------------ +// OnFailure +//------------------------------------------------------------------------------ +void LegacyManualRecorder::OnFailure(std::error_code error) noexcept { + ++flushed_seqno_; + active_request_.Clear(); + options_.metrics_observer->OnSpansDropped( + static_cast(saved_pending_spans_)); + dropped_spans_ += saved_dropped_spans_ + saved_pending_spans_; + logger_.Error("Failed to send report: ", error.message()); +} +} // namespace lightstep diff --git a/src/recorder/legacy_manual_recorder.h b/src/recorder/legacy_manual_recorder.h new file mode 100644 index 00000000..be691d2e --- /dev/null +++ b/src/recorder/legacy_manual_recorder.h @@ -0,0 +1,50 @@ +#pragma once + +#include "common/logger.h" +#include "common/noncopyable.h" +#include "lightstep/transporter.h" +#include "recorder/recorder.h" +#include "recorder/report_builder.h" + +namespace lightstep { +// LegacyManualRecorder buffers spans finished by a tracer and sends them over +// to the provided LegacyAsyncTransporter when FlushWithTimeout is called. +class LegacyManualRecorder final : public Recorder, + private LegacyAsyncTransporter::Callback, + private Noncopyable { + public: + LegacyManualRecorder(Logger& logger, LightStepTracerOptions options, + std::unique_ptr&& transporter); + + void RecordSpan(const collector::Span& span) noexcept override; + + bool FlushWithTimeout( + std::chrono::system_clock::duration timeout) noexcept override; + + private: + bool IsReportInProgress() const noexcept; + + bool FlushOne() noexcept; + + void OnSuccess() noexcept override; + void OnFailure(std::error_code error) noexcept override; + + Logger& logger_; + LightStepTracerOptions options_; + + bool disabled_ = false; + + // Buffer state + ReportBuilder builder_; + collector::ReportRequest active_request_; + collector::ReportResponse active_response_; + size_t saved_dropped_spans_ = 0; + size_t saved_pending_spans_ = 0; + size_t flushed_seqno_ = 0; + size_t encoding_seqno_ = 1; + size_t dropped_spans_ = 0; + + // LegacyAsyncTransporter through which to send span reports. + std::unique_ptr transporter_; +}; +} // namespace lightstep diff --git a/src/recorder/manual_recorder.cpp b/src/recorder/manual_recorder.cpp index 485fe5bd..0c92cf1f 100644 --- a/src/recorder/manual_recorder.cpp +++ b/src/recorder/manual_recorder.cpp @@ -1,129 +1,149 @@ #include "recorder/manual_recorder.h" -#include "common/utility.h" + +#include +#include + +#include "common/random.h" +#include "common/report_request_framing.h" +#include "recorder/serialization/report_request.h" +#include "recorder/serialization/report_request_header.h" namespace lightstep { -//------------------------------------------------------------------------------ -// Constructor -//------------------------------------------------------------------------------ +//-------------------------------------------------------------------------------------------------- +// GetMetricsObserver +//-------------------------------------------------------------------------------------------------- +static MetricsObserver& GetMetricsObserver( + LightStepTracerOptions& tracer_options) { + if (tracer_options.metrics_observer == nullptr) { + tracer_options.metrics_observer.reset(new MetricsObserver{}); + } + return *tracer_options.metrics_observer; +} + +//-------------------------------------------------------------------------------------------------- +// constructor +//-------------------------------------------------------------------------------------------------- ManualRecorder::ManualRecorder(Logger& logger, LightStepTracerOptions options, std::unique_ptr&& transporter) : logger_{logger}, - options_{std::move(options)}, - builder_{options_.access_token, options_.tags}, - transporter_{std::move(transporter)} { - // If no MetricsObserver was provided, use a default one that does nothing. - if (options_.metrics_observer == nullptr) { - options_.metrics_observer.reset(new MetricsObserver{}); + tracer_options_{std::move(options)}, + transporter_{std::move(transporter)}, + report_request_header_{new std::string{ + WriteReportRequestHeader(tracer_options_, GenerateId())}}, + metrics_{GetMetricsObserver(tracer_options_)}, + span_buffer_{tracer_options_.max_buffered_spans.value()} {} + +//-------------------------------------------------------------------------------------------------- +// ReserveHeaderSpace +//-------------------------------------------------------------------------------------------------- +Fragment ManualRecorder::ReserveHeaderSpace(ChainedStream& stream) { + const size_t max_header_size = ReportRequestSpansMaxHeaderSize; + static_assert(ChainedStream::BlockSize >= max_header_size, + "BockSize too small"); + void* data; + int size; + if (!stream.Next(&data, &size)) { + throw std::bad_alloc{}; } + stream.BackUp(size - static_cast(max_header_size)); + return {data, static_cast(max_header_size)}; } -//------------------------------------------------------------------------------ +//-------------------------------------------------------------------------------------------------- // RecordSpan -//------------------------------------------------------------------------------ -void ManualRecorder::RecordSpan(const collector::Span& span) noexcept try { - if (disabled_) { - dropped_spans_++; - options_.metrics_observer->OnSpansDropped(1); +//-------------------------------------------------------------------------------------------------- +void ManualRecorder::RecordSpan( + Fragment header_fragment, std::unique_ptr&& span) noexcept { + // Frame the Span + auto header_data = static_cast(header_fragment.first); + auto reserved_header_size = static_cast(header_fragment.second); + auto protobuf_body_size = span->ByteCount() - header_fragment.second; + auto protobuf_header_size = WriteReportRequestSpansHeader( + header_data, reserved_header_size, protobuf_body_size); + span->CloseOutput(); + + // Advance past reserved header space we didn't use. + span->Seek(0, static_cast(reserved_header_size - protobuf_header_size)); + + auto was_added = span_buffer_.Add(span); + if (was_added) { return; } + transporter_->OnSpanBufferFull(); - auto max_buffered_spans = options_.max_buffered_spans.value(); - if (builder_.num_pending_spans() >= max_buffered_spans) { - // If there's no report in flight, flush the recoder. We can only get - // here if max_buffered_spans was dynamically decreased. - // - // Otherwise, drop the span. - if (!IsReportInProgress()) { - FlushOne(); - } else { - dropped_spans_++; - options_.metrics_observer->OnSpansDropped(1); - return; - } - } - builder_.AddSpan(span); - if (builder_.num_pending_spans() >= max_buffered_spans) { - FlushOne(); + // Attempt to add the span again in case the transporter decided to flush + if (!span_buffer_.Add(span)) { + metrics_.OnSpansDropped(1); } -} catch (const std::exception& e) { - logger_.Error("Failed to record span: ", e.what()); -} - -//------------------------------------------------------------------------------ -// IsReportInProgress -//------------------------------------------------------------------------------ -bool ManualRecorder::IsReportInProgress() const noexcept { - return encoding_seqno_ > 1 + flushed_seqno_; } -//------------------------------------------------------------------------------ -// FlushOne -//------------------------------------------------------------------------------ -bool ManualRecorder::FlushOne() noexcept try { - options_.metrics_observer->OnFlush(); - - // If a report is currently in flight, do nothing; and if there are any - // pending spans, then the flush is considered to have failed. - if (IsReportInProgress()) { - return builder_.num_pending_spans() == 0; - } - - saved_pending_spans_ = builder_.num_pending_spans(); - if (saved_pending_spans_ == 0) { - return true; +//-------------------------------------------------------------------------------------------------- +// FlushWithTimeout +//-------------------------------------------------------------------------------------------------- +bool ManualRecorder::FlushWithTimeout( + std::chrono::system_clock::duration /*timeout*/) noexcept try { + std::unique_ptr report_request{new ReportRequest{ + report_request_header_, metrics_.ConsumeDroppedSpans()}}; + { + std::lock_guard lock_guard{flush_mutex_}; + auto num_spans = span_buffer_.size(); + if (num_spans == 0 && report_request->num_dropped_spans() == 0) { + // Nothing to do + return true; + } + span_buffer_.Consume( + num_spans, [&](CircularBufferRange> & + spans) noexcept { + spans.ForEach([&](AtomicUniquePtr & span) noexcept { + std::unique_ptr span_out; + span.Swap(span_out); + report_request->AddSpan(std::move(span_out)); + return true; + }); + return true; + }); } - options_.metrics_observer->OnSpansSent( - static_cast(saved_pending_spans_)); - saved_dropped_spans_ = dropped_spans_; - builder_.set_pending_client_dropped_spans(dropped_spans_); - dropped_spans_ = 0; - std::swap(builder_.pending(), active_request_); - ++encoding_seqno_; - transporter_->Send(active_request_, active_response_, *this); + transporter_->Send(std::unique_ptr{report_request.release()}, + *this); + metrics_.OnFlush(); return true; } catch (const std::exception& e) { - logger_.Error("Failed to Flush: ", e.what()); - options_.metrics_observer->OnSpansDropped(saved_pending_spans_); - dropped_spans_ += saved_pending_spans_; - active_request_.Clear(); + logger_.Error("Failed to flush report: ", e.what()); return false; } -//------------------------------------------------------------------------------ -// FlushWithTimeout -//------------------------------------------------------------------------------ -bool ManualRecorder::FlushWithTimeout( - std::chrono::system_clock::duration /*timeout*/) noexcept { - if (disabled_) { - return false; - } - return FlushOne(); +//-------------------------------------------------------------------------------------------------- +// OnForkedChild +//-------------------------------------------------------------------------------------------------- +void ManualRecorder::OnForkedChild() noexcept { + metrics_.ConsumeDroppedSpans(); + span_buffer_.Clear(); } -//------------------------------------------------------------------------------ +//-------------------------------------------------------------------------------------------------- // OnSuccess -//------------------------------------------------------------------------------ -void ManualRecorder::OnSuccess() noexcept { - ++flushed_seqno_; - active_request_.Clear(); - LogReportResponse(logger_, options_.verbose, active_response_); - for (auto& command : active_response_.commands()) { - if (command.disable()) { - logger_.Warn("Tracer disabled by collector"); - disabled_ = true; - } +//-------------------------------------------------------------------------------------------------- +void ManualRecorder::OnSuccess(BufferChain& message) noexcept { + auto report_request = dynamic_cast(&message); + assert(report_request != nullptr); + if (report_request == nullptr) { + // This should never happen + return; } + metrics_.OnSpansSent(report_request->num_spans()); } -//------------------------------------------------------------------------------ +//-------------------------------------------------------------------------------------------------- // OnFailure -//------------------------------------------------------------------------------ -void ManualRecorder::OnFailure(std::error_code error) noexcept { - ++flushed_seqno_; - active_request_.Clear(); - options_.metrics_observer->OnSpansDropped( - static_cast(saved_pending_spans_)); - dropped_spans_ += saved_dropped_spans_ + saved_pending_spans_; - logger_.Error("Failed to send report: ", error.message()); +//-------------------------------------------------------------------------------------------------- +void ManualRecorder::OnFailure(BufferChain& message) noexcept { + auto report_request = dynamic_cast(&message); + assert(report_request != nullptr); + if (report_request == nullptr) { + // This should never happen + return; + } + metrics_.OnSpansDropped(report_request->num_spans()); + metrics_.UnconsumeDroppedSpans(report_request->num_dropped_spans()); } } // namespace lightstep diff --git a/src/recorder/manual_recorder.h b/src/recorder/manual_recorder.h index 2ea1c7b5..9ec1a408 100644 --- a/src/recorder/manual_recorder.h +++ b/src/recorder/manual_recorder.h @@ -1,50 +1,50 @@ #pragma once +#include + +#include "common/circular_buffer.h" #include "common/logger.h" #include "common/noncopyable.h" #include "lightstep/transporter.h" -#include "recorder/recorder.h" -#include "recorder/report_builder.h" +#include "recorder/fork_aware_recorder.h" +#include "recorder/metrics_tracker.h" namespace lightstep { -// ManualRecorder buffers spans finished by a tracer and sends them over to -// the provided AsyncTransporter when FlushWithTimeout is called. -class ManualRecorder final : public Recorder, - private AsyncTransporter::Callback, - private Noncopyable { +/** + * Implements a Recorder with no background thread and custom Transporter. + */ +class ManualRecorder : public ForkAwareRecorder, + public AsyncTransporter::Callback, + private Noncopyable { public: ManualRecorder(Logger& logger, LightStepTracerOptions options, std::unique_ptr&& transporter); - void RecordSpan(const collector::Span& span) noexcept override; + // Recorder + Fragment ReserveHeaderSpace(ChainedStream& stream) override; + + void RecordSpan(Fragment header_fragment, + std::unique_ptr&& span) noexcept override; bool FlushWithTimeout( std::chrono::system_clock::duration timeout) noexcept override; - private: - bool IsReportInProgress() const noexcept; + // ForkAwareRecorder + void OnForkedChild() noexcept override; - bool FlushOne() noexcept; + // AsyncTransporter::Callback + void OnSuccess(BufferChain& message) noexcept override; - void OnSuccess() noexcept override; - void OnFailure(std::error_code error) noexcept override; + void OnFailure(BufferChain& message) noexcept override; + private: Logger& logger_; - LightStepTracerOptions options_; - - bool disabled_ = false; - - // Buffer state - ReportBuilder builder_; - collector::ReportRequest active_request_; - collector::ReportResponse active_response_; - size_t saved_dropped_spans_ = 0; - size_t saved_pending_spans_ = 0; - size_t flushed_seqno_ = 0; - size_t encoding_seqno_ = 1; - size_t dropped_spans_ = 0; - - // AsyncTransporter through which to send span reports. + LightStepTracerOptions tracer_options_; std::unique_ptr transporter_; + std::shared_ptr report_request_header_; + + MetricsTracker metrics_; + std::mutex flush_mutex_; + CircularBuffer span_buffer_; }; } // namespace lightstep diff --git a/src/recorder/stream_recorder/stream_recorder_metrics.cpp b/src/recorder/metrics_tracker.cpp similarity index 75% rename from src/recorder/stream_recorder/stream_recorder_metrics.cpp rename to src/recorder/metrics_tracker.cpp index 09232ee1..7f34655a 100644 --- a/src/recorder/stream_recorder/stream_recorder_metrics.cpp +++ b/src/recorder/metrics_tracker.cpp @@ -1,36 +1,35 @@ -#include "recorder/stream_recorder/stream_recorder_metrics.h" +#include "recorder/metrics_tracker.h" namespace lightstep { //-------------------------------------------------------------------------------------------------- // constructor //-------------------------------------------------------------------------------------------------- -StreamRecorderMetrics::StreamRecorderMetrics( - MetricsObserver& metrics_observer) noexcept +MetricsTracker::MetricsTracker(MetricsObserver& metrics_observer) noexcept : metrics_observer_{metrics_observer} {} //-------------------------------------------------------------------------------------------------- // OnSpansSent //-------------------------------------------------------------------------------------------------- -void StreamRecorderMetrics::OnSpansSent(int num_spans) noexcept { +void MetricsTracker::OnSpansSent(int num_spans) noexcept { metrics_observer_.OnSpansSent(num_spans); } //-------------------------------------------------------------------------------------------------- // OnFlush //-------------------------------------------------------------------------------------------------- -void StreamRecorderMetrics::OnFlush() noexcept { metrics_observer_.OnFlush(); } +void MetricsTracker::OnFlush() noexcept { metrics_observer_.OnFlush(); } //-------------------------------------------------------------------------------------------------- // ConsumeDroppedSpans //-------------------------------------------------------------------------------------------------- -int StreamRecorderMetrics::ConsumeDroppedSpans() noexcept { +int MetricsTracker::ConsumeDroppedSpans() noexcept { return num_dropped_spans_.exchange(0); } //-------------------------------------------------------------------------------------------------- // UnconsumeDroppedSpans //-------------------------------------------------------------------------------------------------- -void StreamRecorderMetrics::UnconsumeDroppedSpans(int num_spans) noexcept { +void MetricsTracker::UnconsumeDroppedSpans(int num_spans) noexcept { num_dropped_spans_ += num_spans; } } // namespace lightstep diff --git a/src/recorder/stream_recorder/stream_recorder_metrics.h b/src/recorder/metrics_tracker.h similarity index 91% rename from src/recorder/stream_recorder/stream_recorder_metrics.h rename to src/recorder/metrics_tracker.h index c7d15294..9109087e 100644 --- a/src/recorder/stream_recorder/stream_recorder_metrics.h +++ b/src/recorder/metrics_tracker.h @@ -8,9 +8,9 @@ namespace lightstep { /** * Manages the metrics associated with a StreamRecorder. */ -class StreamRecorderMetrics { +class MetricsTracker { public: - explicit StreamRecorderMetrics(MetricsObserver& metrics_observer) noexcept; + explicit MetricsTracker(MetricsObserver& metrics_observer) noexcept; /** * Record dropped spans. diff --git a/src/recorder/recorder.h b/src/recorder/recorder.h index a702b69a..d9738860 100644 --- a/src/recorder/recorder.h +++ b/src/recorder/recorder.h @@ -3,9 +3,10 @@ #include #include -#include "common/serialization_chain.h" +#include "common/chained_stream.h" #include "common/timestamp.h" +#include #include #include "lightstep-tracer-common/collector.pb.h" @@ -23,18 +24,24 @@ class Recorder { Recorder& operator=(Recorder&&) = delete; Recorder& operator=(const Recorder&) = delete; + virtual Fragment ReserveHeaderSpace(ChainedStream& /*stream*/) { return {}; } + + virtual void WriteFooter( + google::protobuf::io::CodedOutputStream& /*coded_stream*/) {} + /** * Record a Span - * @span the protobuf span + * @header_fragment the fragment reserved to hold header framing for the span + * @span the serialization of a protobuf span and framing */ - virtual void RecordSpan(const collector::Span& /*span*/) noexcept {} + virtual void RecordSpan(Fragment /*header_fragment*/, + std::unique_ptr&& /*span*/) noexcept {} /** * Record a Span - * @span the serialization of a protobuf span and framing + * @span the protobuf span */ - virtual void RecordSpan( - std::unique_ptr&& /*span*/) noexcept {} + virtual void RecordSpan(const collector::Span& /*span*/) noexcept {} /** * Block until the recorder is flushed or a time limit is exceeded. diff --git a/src/recorder/serialization/BUILD b/src/recorder/serialization/BUILD new file mode 100644 index 00000000..8ea1096f --- /dev/null +++ b/src/recorder/serialization/BUILD @@ -0,0 +1,53 @@ +load( + "//bazel:lightstep_build_system.bzl", + "lightstep_cc_library", + "lightstep_package", +) + +lightstep_package() + +lightstep_cc_library( + name = "report_request_lib", + private_hdrs = [ + "report_request.h", + ], + srcs = [ + "report_request.cpp", + ], + deps = [ + "//src/common:buffer_chain_lib", + "//src/common:chained_stream_lib", + ":embedded_metrics_message_lib", + ], +) + +lightstep_cc_library( + name = "report_request_header_lib", + private_hdrs = [ + "report_request_header.h", + ], + srcs = [ + "report_request_header.cpp", + ], + deps = [ + "//include/lightstep:tracer_interface", + "//lightstep-tracer-common:collector_proto_cc", + "//src/common:protobuf_lib", + "//src/common:utility_lib", + ":embedded_metrics_message_lib", + ], +) + +lightstep_cc_library( + name = "embedded_metrics_message_lib", + private_hdrs = [ + "embedded_metrics_message.h", + ], + srcs = [ + "embedded_metrics_message.cpp", + ], + deps = [ + "//lightstep-tracer-common:collector_proto_cc", + "//src/common:protobuf_lib", + ], +) diff --git a/src/recorder/stream_recorder/embedded_metrics_message.cpp b/src/recorder/serialization/embedded_metrics_message.cpp similarity index 97% rename from src/recorder/stream_recorder/embedded_metrics_message.cpp rename to src/recorder/serialization/embedded_metrics_message.cpp index bd7d97a8..7f79dd90 100644 --- a/src/recorder/stream_recorder/embedded_metrics_message.cpp +++ b/src/recorder/serialization/embedded_metrics_message.cpp @@ -1,4 +1,4 @@ -#include "recorder/stream_recorder/embedded_metrics_message.h" +#include "recorder/serialization/embedded_metrics_message.h" #include "common/protobuf.h" diff --git a/src/recorder/stream_recorder/embedded_metrics_message.h b/src/recorder/serialization/embedded_metrics_message.h similarity index 100% rename from src/recorder/stream_recorder/embedded_metrics_message.h rename to src/recorder/serialization/embedded_metrics_message.h diff --git a/src/recorder/serialization/report_request.cpp b/src/recorder/serialization/report_request.cpp new file mode 100644 index 00000000..bfb9e70d --- /dev/null +++ b/src/recorder/serialization/report_request.cpp @@ -0,0 +1,70 @@ +#include "recorder/serialization/report_request.h" + +namespace lightstep { +//-------------------------------------------------------------------------------------------------- +// constructor +//-------------------------------------------------------------------------------------------------- +ReportRequest::ReportRequest(std::shared_ptr header, + int num_dropped_spans) + : header_{std::move(header)} { + num_bytes_ = static_cast(header_->size()); + if (num_dropped_spans == 0) { + return; + } + auto metrics = new EmbeddedMetricsMessage{}; + metrics_.reset(metrics); + metrics->set_num_dropped_spans(num_dropped_spans); + metrics_fragment_ = metrics->MakeFragment(); + num_bytes_ += metrics_fragment_.second; +} + +//-------------------------------------------------------------------------------------------------- +// AddSpan +//-------------------------------------------------------------------------------------------------- +void ReportRequest::AddSpan(std::unique_ptr&& span) noexcept { + ++num_spans_; + num_fragments_ += span->num_fragments(); + span->ForEachFragment([&](void* /*data*/, int length) { + num_bytes_ += static_cast(length); + return true; + }); + if (spans_ == nullptr) { + spans_ = std::move(span); + return; + } + spans_->Append(std::move(span)); +} + +//-------------------------------------------------------------------------------------------------- +// num_dropped_spans +//-------------------------------------------------------------------------------------------------- +int ReportRequest::num_dropped_spans() const noexcept { + if (metrics_ == nullptr) { + return 0; + } + return metrics_->num_dropped_spans(); +} + +//-------------------------------------------------------------------------------------------------- +// ForEachFragment +//-------------------------------------------------------------------------------------------------- +bool ReportRequest::ForEachFragment(FragmentCallback callback, + void* context) const { + if (!callback(context, static_cast(header_->data()), + header_->size())) { + return false; + } + if (metrics_ != nullptr) { + if (!callback(context, metrics_fragment_.first, + static_cast(metrics_fragment_.second))) { + return false; + } + } + if (spans_ == nullptr) { + return true; + } + return spans_->ForEachFragment([&](void* data, int length) { + return callback(context, data, static_cast(length)); + }); +} +} // namespace lightstep diff --git a/src/recorder/serialization/report_request.h b/src/recorder/serialization/report_request.h new file mode 100644 index 00000000..2bfc5ddb --- /dev/null +++ b/src/recorder/serialization/report_request.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +#include "common/chained_stream.h" +#include "lightstep/buffer_chain.h" +#include "recorder/serialization/embedded_metrics_message.h" + +namespace lightstep { +/** + * Maintains a BufferChain for a collector::ReportRequest serialization. + */ +class ReportRequest final : public BufferChain { + public: + ReportRequest(std::shared_ptr header, + int num_dropped_spans); + + /** + * Add a serialized Span to the ReportRequest. + * @param span the serialized span to add. + */ + void AddSpan(std::unique_ptr&& span) noexcept; + + /** + * @return the number of dropped spans recorded in the ReportRequest + */ + int num_dropped_spans() const noexcept; + + /** + * @return the number of spans in the ReportRequest + */ + int num_spans() const noexcept { return num_spans_; } + + // BufferChain + size_t num_fragments() const noexcept override { + return static_cast(num_fragments_); + } + + size_t num_bytes() const noexcept override { + return static_cast(num_bytes_); + } + + bool ForEachFragment(FragmentCallback callback, void* context) const override; + + private: + std::shared_ptr header_; + + std::unique_ptr metrics_; + std::pair metrics_fragment_; + + int num_bytes_{0}; + int num_spans_{0}; + int num_fragments_{0}; + std::unique_ptr spans_; +}; +} // namespace lightstep diff --git a/src/recorder/serialization/report_request_header.cpp b/src/recorder/serialization/report_request_header.cpp new file mode 100644 index 00000000..6c87a285 --- /dev/null +++ b/src/recorder/serialization/report_request_header.cpp @@ -0,0 +1,37 @@ +#include "recorder/serialization/report_request_header.h" + +#include "common/protobuf.h" +#include "common/utility.h" +#include "lightstep-tracer-common/collector.pb.h" + +#include +#include + +namespace lightstep { +std::string WriteReportRequestHeader( + const LightStepTracerOptions& tracer_options, uint64_t reporter_id) { + collector::Reporter reporter; + reporter.set_reporter_id(reporter_id); + reporter.mutable_tags()->Reserve( + static_cast(tracer_options.tags.size())); + for (const auto& tag : tracer_options.tags) { + *reporter.mutable_tags()->Add() = ToKeyValue(tag.first, tag.second); + } + + collector::Auth auth; + auth.set_access_token(tracer_options.access_token); + + std::ostringstream oss; + { + google::protobuf::io::OstreamOutputStream zero_copy_stream{&oss}; + google::protobuf::io::CodedOutputStream coded_stream{&zero_copy_stream}; + + WriteEmbeddedMessage( + coded_stream, collector::ReportRequest::kReporterFieldNumber, reporter); + WriteEmbeddedMessage(coded_stream, + collector::ReportRequest::kAuthFieldNumber, auth); + } + + return oss.str(); +} +} // namespace lightstep diff --git a/src/recorder/serialization/report_request_header.h b/src/recorder/serialization/report_request_header.h new file mode 100644 index 00000000..a55238e7 --- /dev/null +++ b/src/recorder/serialization/report_request_header.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +#include "lightstep/tracer.h" + +namespace lightstep { +/** + * Serializes the common parts of a ReportRequest. + * @param tracer_options the options used to construct the tracer. + * @param reporter_id a unique ID for the reporter. + * @return a string with the common parts of the ReportRequest serialization. + */ +std::string WriteReportRequestHeader( + const LightStepTracerOptions& tracer_options, uint64_t reporter_id); +} // namespace lightstep diff --git a/src/recorder/stream_recorder/BUILD b/src/recorder/stream_recorder/BUILD index 0fad9c35..662aedba 100644 --- a/src/recorder/stream_recorder/BUILD +++ b/src/recorder/stream_recorder/BUILD @@ -16,19 +16,6 @@ lightstep_cc_library( ], ) -lightstep_cc_library( - name = "stream_recorder_metrics_lib", - private_hdrs = [ - "stream_recorder_metrics.h", - ], - srcs = [ - "stream_recorder_metrics.cpp", - ], - deps = [ - "//include/lightstep:metrics_observer_interface", - ], -) - lightstep_cc_library( name = "span_stream_lib", private_hdrs = [ @@ -39,9 +26,9 @@ lightstep_cc_library( ], deps = [ "//src/common:circular_buffer_lib", - "//src/common:serialization_chain_lib", + "//src/common:chained_stream_lib", "//src/common:fragment_input_stream_lib", - ":stream_recorder_metrics_lib", + "//src/recorder:metrics_tracker_lib", ], ) @@ -58,9 +45,9 @@ lightstep_cc_library( "//src/common:utility_lib", "//src/common:fragment_input_stream_lib", "//src/common:fragment_array_input_stream_lib", - ":embedded_metrics_message_lib", + "//src/recorder/serialization:embedded_metrics_message_lib", ":span_stream_lib", - ":stream_recorder_metrics_lib", + "//src/recorder:metrics_tracker_lib", ], ) @@ -77,16 +64,18 @@ lightstep_cc_library( deps = [ "//src/common:circular_buffer_lib", "//src/common:logger_lib", + "//src/common:report_request_framing_lib", + "//src/common:chunked_http_framing_lib", "//src/common:noncopyable_lib", "//src/common:protobuf_lib", "//src/common/platform:network_environment_lib", - "//src/common:serialization_chain_lib", + "//src/common:chained_stream_lib", "//src/network:event_lib", "//src/network:timer_event_lib", "//src/recorder:fork_aware_recorder_lib", "//src/recorder:stream_recorder_interface", ":stream_recorder_options_lib", - ":stream_recorder_metrics_lib", + "//src/recorder:metrics_tracker_lib", ":satellite_streamer_lib", ], ) @@ -147,9 +136,8 @@ lightstep_cc_library( "//src/network:event_lib", "//src/network:timer_event_lib", "//src/network:vector_write_lib", - ":stream_recorder_metrics_lib", - ":embedded_metrics_message_lib", - ":utility_lib", + "//src/recorder/serialization:report_request_header_lib", + "//src/recorder:metrics_tracker_lib", ":host_header_lib", ":span_stream_lib", ":connection_stream_lib", @@ -166,26 +154,7 @@ lightstep_cc_library( "utility.cpp", ], deps = [ - "//include/lightstep:tracer_interface", - "//lightstep-tracer-common:collector_proto_cc", - "//src/common:protobuf_lib", "//src/common/platform:string_lib", - "//src/common:utility_lib", - "//src/common:fragment_array_input_stream_lib", - ], -) - -lightstep_cc_library( - name = "embedded_metrics_message_lib", - private_hdrs = [ - "embedded_metrics_message.h", - ], - srcs = [ - "embedded_metrics_message.cpp", - ], - deps = [ - "//lightstep-tracer-common:collector_proto_cc", - "//src/common:protobuf_lib", ], ) diff --git a/src/recorder/stream_recorder/connection_stream.h b/src/recorder/stream_recorder/connection_stream.h index abc8e17c..4e80b4c1 100644 --- a/src/recorder/stream_recorder/connection_stream.h +++ b/src/recorder/stream_recorder/connection_stream.h @@ -7,10 +7,10 @@ #include "common/fragment_array_input_stream.h" #include "common/fragment_input_stream.h" #include "common/function_ref.h" +#include "common/hex_conversion.h" #include "common/utility.h" -#include "recorder/stream_recorder/embedded_metrics_message.h" +#include "recorder/serialization/embedded_metrics_message.h" #include "recorder/stream_recorder/span_stream.h" -#include "recorder/stream_recorder/stream_recorder_metrics.h" namespace lightstep { /** @@ -72,7 +72,7 @@ class ConnectionStream { FragmentArrayInputStream header_stream_; FragmentArrayInputStream terminal_stream_; - std::unique_ptr span_remnant_; + std::unique_ptr span_remnant_; bool shutting_down_; diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index b7cc7b57..818bd9dd 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -4,7 +4,7 @@ #include #include "common/random.h" -#include "recorder/stream_recorder/utility.h" +#include "recorder/serialization/report_request_header.h" namespace lightstep { //-------------------------------------------------------------------------------------------------- @@ -13,15 +13,14 @@ namespace lightstep { SatelliteStreamer::SatelliteStreamer( Logger& logger, EventBase& event_base, const LightStepTracerOptions& tracer_options, - const StreamRecorderOptions& recorder_options, - StreamRecorderMetrics& metrics, - CircularBuffer& span_buffer) + const StreamRecorderOptions& recorder_options, MetricsTracker& metrics, + CircularBuffer& span_buffer) : logger_{logger}, event_base_{event_base}, tracer_options_{tracer_options}, recorder_options_{recorder_options}, header_common_fragment_{ - WriteStreamHeaderCommonFragment(tracer_options, GenerateId())}, + WriteReportRequestHeader(tracer_options, GenerateId())}, endpoint_manager_{logger, event_base, tracer_options, recorder_options, [this] { this->OnEndpointManagerReady(); }}, span_buffer_{span_buffer}, diff --git a/src/recorder/stream_recorder/satellite_streamer.h b/src/recorder/stream_recorder/satellite_streamer.h index 3acff999..4cf26be2 100644 --- a/src/recorder/stream_recorder/satellite_streamer.h +++ b/src/recorder/stream_recorder/satellite_streamer.h @@ -5,10 +5,10 @@ #include "common/noncopyable.h" #include "common/random_traverser.h" +#include "recorder/metrics_tracker.h" #include "recorder/stream_recorder/satellite_connection.h" #include "recorder/stream_recorder/satellite_endpoint_manager.h" #include "recorder/stream_recorder/span_stream.h" -#include "recorder/stream_recorder/stream_recorder_metrics.h" namespace lightstep { /** @@ -19,8 +19,8 @@ class SatelliteStreamer : private Noncopyable { SatelliteStreamer(Logger& logger, EventBase& event_base, const LightStepTracerOptions& tracer_options, const StreamRecorderOptions& recorder_options, - StreamRecorderMetrics& metrics, - CircularBuffer& span_buffer); + MetricsTracker& metrics, + CircularBuffer& span_buffer); /** * @return the associated Logger. @@ -90,7 +90,7 @@ class SatelliteStreamer : private Noncopyable { const StreamRecorderOptions& recorder_options_; std::string header_common_fragment_; SatelliteEndpointManager endpoint_manager_; - CircularBuffer& span_buffer_; + CircularBuffer& span_buffer_; SpanStream span_stream_; std::vector> connections_; RandomTraverser connection_traverser_; diff --git a/src/recorder/stream_recorder/span_stream.cpp b/src/recorder/stream_recorder/span_stream.cpp index 573bd0cb..d5ef9a20 100644 --- a/src/recorder/stream_recorder/span_stream.cpp +++ b/src/recorder/stream_recorder/span_stream.cpp @@ -4,8 +4,8 @@ namespace lightstep { //-------------------------------------------------------------------------------------------------- // constructor //-------------------------------------------------------------------------------------------------- -SpanStream::SpanStream(CircularBuffer& span_buffer, - StreamRecorderMetrics& metrics) noexcept +SpanStream::SpanStream(CircularBuffer& span_buffer, + MetricsTracker& metrics) noexcept : span_buffer_{span_buffer}, metrics_{metrics} {} //-------------------------------------------------------------------------------------------------- @@ -16,8 +16,8 @@ void SpanStream::Allot() noexcept { allotment_ = span_buffer_.Peek(); } //-------------------------------------------------------------------------------------------------- // ConsumeRemnant //-------------------------------------------------------------------------------------------------- -std::unique_ptr SpanStream::ConsumeRemnant() noexcept { - return std::unique_ptr{remnant_.release()}; +std::unique_ptr SpanStream::ConsumeRemnant() noexcept { + return std::unique_ptr{remnant_.release()}; } //-------------------------------------------------------------------------------------------------- @@ -26,7 +26,7 @@ std::unique_ptr SpanStream::ConsumeRemnant() noexcept { int SpanStream::num_fragments() const noexcept { int result = 0; allotment_.ForEach([&result]( - const AtomicUniquePtr& span) noexcept { + const AtomicUniquePtr& span) noexcept { result += span->num_fragments(); return true; }); @@ -38,7 +38,7 @@ int SpanStream::num_fragments() const noexcept { //-------------------------------------------------------------------------------------------------- bool SpanStream::ForEachFragment(Callback callback) const noexcept { return allotment_.ForEach( - [callback](const AtomicUniquePtr& span) { + [callback](const AtomicUniquePtr& span) { return span->ForEachFragment(callback); }); } @@ -50,7 +50,7 @@ void SpanStream::Clear() noexcept { remnant_.reset(); metrics_.OnSpansSent(allotment_.size()); span_buffer_.Consume(allotment_.size()); - allotment_ = CircularBufferRange>{}; + allotment_ = CircularBufferRange>{}; } //-------------------------------------------------------------------------------------------------- @@ -61,7 +61,7 @@ void SpanStream::Seek(int fragment_index, int position) noexcept { int full_span_count = 0; int span_count = 0; allotment_.ForEach([&, fragment_index ]( - const AtomicUniquePtr& span) mutable noexcept { + const AtomicUniquePtr& span) mutable noexcept { auto num_fragments = span->num_fragments(); if (num_fragments <= fragment_index) { fragment_index -= num_fragments; @@ -79,9 +79,9 @@ void SpanStream::Seek(int fragment_index, int position) noexcept { }); span_buffer_.Consume( span_count, [ this, fragment_index, position ]( - CircularBufferRange> + CircularBufferRange> range) mutable noexcept { - range.ForEach([&](AtomicUniquePtr & span) noexcept { + range.ForEach([&](AtomicUniquePtr & span) noexcept { auto num_fragments = span->num_fragments(); if (num_fragments <= fragment_index) { fragment_index -= num_fragments; @@ -94,6 +94,6 @@ void SpanStream::Seek(int fragment_index, int position) noexcept { }); }); metrics_.OnSpansSent(full_span_count); - allotment_ = CircularBufferRange>{}; + allotment_ = CircularBufferRange>{}; } } // namespace lightstep diff --git a/src/recorder/stream_recorder/span_stream.h b/src/recorder/stream_recorder/span_stream.h index 966d8c87..90672e41 100644 --- a/src/recorder/stream_recorder/span_stream.h +++ b/src/recorder/stream_recorder/span_stream.h @@ -1,8 +1,8 @@ #pragma once +#include "common/chained_stream.h" #include "common/circular_buffer.h" -#include "common/serialization_chain.h" -#include "recorder/stream_recorder/stream_recorder_metrics.h" +#include "recorder/metrics_tracker.h" namespace lightstep { /** @@ -11,8 +11,8 @@ namespace lightstep { */ class SpanStream final : public FragmentInputStream { public: - SpanStream(CircularBuffer& span_buffer, - StreamRecorderMetrics& metrics) noexcept; + SpanStream(CircularBuffer& span_buffer, + MetricsTracker& metrics) noexcept; /** * Allots spans from the associated circular buffer to stream to satellites. @@ -23,12 +23,12 @@ class SpanStream final : public FragmentInputStream { * Returns and removes the last partially written span. * @return the last partially written span */ - std::unique_ptr ConsumeRemnant() noexcept; + std::unique_ptr ConsumeRemnant() noexcept; /** - * @return the associagted StreamRecorderMetrics + * @return the associagted MetricsTracker */ - StreamRecorderMetrics& metrics() const noexcept { return metrics_; } + MetricsTracker& metrics() const noexcept { return metrics_; } // FragmentInputStream int num_fragments() const noexcept override; @@ -40,9 +40,9 @@ class SpanStream final : public FragmentInputStream { void Seek(int fragment_index, int position) noexcept override; private: - CircularBuffer& span_buffer_; - StreamRecorderMetrics& metrics_; - CircularBufferRange> allotment_; - std::unique_ptr remnant_; + CircularBuffer& span_buffer_; + MetricsTracker& metrics_; + CircularBufferRange> allotment_; + std::unique_ptr remnant_; }; } // namespace lightstep diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index c4e39c88..0bdaee6a 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -1,8 +1,11 @@ #include "recorder/stream_recorder/stream_recorder.h" +#include #include +#include "common/chunked_http_framing.h" #include "common/protobuf.h" +#include "common/report_request_framing.h" namespace lightstep { //-------------------------------------------------------------------------------------------------- @@ -43,12 +46,53 @@ StreamRecorder::~StreamRecorder() noexcept { shutdown_condition_variable_.notify_all(); } +//-------------------------------------------------------------------------------------------------- +// ReserveHedaerSpace +//-------------------------------------------------------------------------------------------------- +Fragment StreamRecorder::ReserveHeaderSpace(ChainedStream& stream) { + const size_t max_header_size = + ReportRequestSpansMaxHeaderSize + ChunkedHttpMaxHeaderSize; + static_assert(ChainedStream::BlockSize >= max_header_size, + "BockSize too small"); + void* data; + int size; + if (!stream.Next(&data, &size)) { + throw std::bad_alloc{}; + } + stream.BackUp(size - static_cast(max_header_size)); + return {data, static_cast(max_header_size)}; +} + +//-------------------------------------------------------------------------------------------------- +// WriteFooter +//-------------------------------------------------------------------------------------------------- +void StreamRecorder::WriteFooter( + google::protobuf::io::CodedOutputStream& coded_stream) { + coded_stream.WriteRaw(ChunkedHttpFooter.data(), ChunkedHttpFooter.size()); +} + //-------------------------------------------------------------------------------------------------- // RecordSpan //-------------------------------------------------------------------------------------------------- void StreamRecorder::RecordSpan( - std::unique_ptr&& span) noexcept { - span->AddFraming(); + Fragment header_fragment, std::unique_ptr&& span) noexcept { + // Frame the Span + auto header_data = static_cast(header_fragment.first); + auto reserved_header_size = static_cast(header_fragment.second); + auto protobuf_body_size = + span->ByteCount() - ChunkedHttpFooter.size() - header_fragment.second; + auto protobuf_header_size = WriteReportRequestSpansHeader( + header_data, reserved_header_size, protobuf_body_size); + auto chunk_body_size = protobuf_body_size + protobuf_header_size; + auto chunk_header_size = WriteHttpChunkHeader( + header_data, reserved_header_size - protobuf_header_size, + chunk_body_size); + span->CloseOutput(); + + // Advance past reserved header space we didn't use. + span->Seek(0, static_cast(reserved_header_size - protobuf_header_size - + chunk_header_size)); + if (!span_buffer_.Add(span)) { // Note: the compiler doesn't want to inline this logger call and it shows // up in profiling with high span droppage even if the logging isn't turned diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index 19cfd6ea..44f4c3ce 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -8,17 +8,17 @@ #include "stream_recorder_impl.h" +#include "common/chained_stream.h" #include "common/circular_buffer.h" #include "common/logger.h" #include "common/noncopyable.h" #include "common/platform/network_environment.h" -#include "common/serialization_chain.h" #include "lightstep/tracer.h" #include "network/event_base.h" #include "network/timer_event.h" #include "recorder/fork_aware_recorder.h" +#include "recorder/metrics_tracker.h" #include "recorder/stream_recorder.h" -#include "recorder/stream_recorder/stream_recorder_metrics.h" #include "recorder/stream_recorder/stream_recorder_options.h" namespace lightstep { @@ -77,19 +77,23 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { } /** - * @return the associated StreamRecorderMetrics. + * @return the associated MetricsTracker. */ - StreamRecorderMetrics& metrics() noexcept { return metrics_; } + MetricsTracker& metrics() noexcept { return metrics_; } /** * @return the associated span buffer. */ - CircularBuffer& span_buffer() noexcept { - return span_buffer_; - } + CircularBuffer& span_buffer() noexcept { return span_buffer_; } // Recorder - void RecordSpan(std::unique_ptr&& span) noexcept override; + Fragment ReserveHeaderSpace(ChainedStream& stream) override; + + void WriteFooter( + google::protobuf::io::CodedOutputStream& coded_stream) override; + + void RecordSpan(Fragment header_fragment, + std::unique_ptr&& span) noexcept override; bool FlushWithTimeout( std::chrono::system_clock::duration timeout) noexcept override; @@ -125,8 +129,8 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { LightStepTracerOptions tracer_options_; StreamRecorderOptions recorder_options_; - StreamRecorderMetrics metrics_; - CircularBuffer span_buffer_; + MetricsTracker metrics_; + CircularBuffer span_buffer_; std::atomic exit_{false}; diff --git a/src/recorder/stream_recorder/utility.cpp b/src/recorder/stream_recorder/utility.cpp index bd3e0e21..41445346 100644 --- a/src/recorder/stream_recorder/utility.cpp +++ b/src/recorder/stream_recorder/utility.cpp @@ -6,12 +6,6 @@ #include #include "common/platform/string.h" -#include "common/protobuf.h" -#include "common/utility.h" -#include "lightstep-tracer-common/collector.pb.h" - -#include -#include namespace lightstep { //-------------------------------------------------------------------------------------------------- @@ -45,34 +39,4 @@ SeparateEndpoints( } return {std::move(hosts), std::move(indexed_endpoints)}; } - -//-------------------------------------------------------------------------------------------------- -// WriteStreamHeaderCommonFragment -//-------------------------------------------------------------------------------------------------- -std::string WriteStreamHeaderCommonFragment( - const LightStepTracerOptions& tracer_options, uint64_t reporter_id) { - collector::Reporter reporter; - reporter.set_reporter_id(reporter_id); - reporter.mutable_tags()->Reserve( - static_cast(tracer_options.tags.size())); - for (const auto& tag : tracer_options.tags) { - *reporter.mutable_tags()->Add() = ToKeyValue(tag.first, tag.second); - } - - collector::Auth auth; - auth.set_access_token(tracer_options.access_token); - - std::ostringstream oss; - { - google::protobuf::io::OstreamOutputStream zero_copy_stream{&oss}; - google::protobuf::io::CodedOutputStream coded_stream{&zero_copy_stream}; - - WriteEmbeddedMessage( - coded_stream, collector::ReportRequest::kReporterFieldNumber, reporter); - WriteEmbeddedMessage(coded_stream, - collector::ReportRequest::kAuthFieldNumber, auth); - } - - return oss.str(); -} } // namespace lightstep diff --git a/src/recorder/stream_recorder/utility.h b/src/recorder/stream_recorder/utility.h index 4af47a31..c9d6348d 100644 --- a/src/recorder/stream_recorder/utility.h +++ b/src/recorder/stream_recorder/utility.h @@ -5,9 +5,6 @@ #include #include -#include "common/fragment_array_input_stream.h" -#include "lightstep/tracer.h" - namespace lightstep { /** * Separates a vector of host-port pairs into a bector of unique hosts and @@ -18,13 +15,4 @@ namespace lightstep { std::pair, std::vector>> SeparateEndpoints( const std::vector>& endpoints); - -/** - * Serializes the common parts of a ReportRequest. - * @param tracer_options the options used to construct the tracer. - * @param reporter_id a unique ID for the reporter. - * @return a string with the common parts of the ReportRequest serialization. - */ -std::string WriteStreamHeaderCommonFragment( - const LightStepTracerOptions& tracer_options, uint64_t reporter_id); } // namespace lightstep diff --git a/src/tracer/BUILD b/src/tracer/BUILD index 8db48d8f..76a84aa9 100644 --- a/src/tracer/BUILD +++ b/src/tracer/BUILD @@ -99,7 +99,6 @@ lightstep_cc_library( "//src/common:logger_lib", "//src/common:utility_lib", "//src/common:random_lib", - "//src/common:serialization_chain_lib", "//src/common:spin_lock_mutex_lib", "//src/recorder:recorder_interface", ":immutable_span_context_lib", @@ -156,6 +155,7 @@ lightstep_cc_library( "//src/common/platform:utility_lib", "//src/recorder:auto_recorder_lib", "//src/recorder:grpc_transporter_interface", + "//src/recorder:legacy_manual_recorder_lib", "//src/recorder:manual_recorder_lib", "//src/recorder:stream_recorder_interface", "//src/tracer/legacy:legacy_tracer_impl_lib", diff --git a/src/tracer/span.cpp b/src/tracer/span.cpp index 2cda12e3..8396c5d8 100644 --- a/src/tracer/span.cpp +++ b/src/tracer/span.cpp @@ -22,17 +22,18 @@ namespace lightstep { Span::Span(std::shared_ptr&& tracer, opentracing::string_view operation_name, const opentracing::StartSpanOptions& options) - : serialization_chain_{new SerializationChain{}}, - stream_{serialization_chain_.get()}, + : chained_stream_{new ChainedStream{}}, + header_fragment_{tracer->recorder().ReserveHeaderSpace(*chained_stream_)}, + coded_stream_{chained_stream_.get()}, tracer_{std::move(tracer)} { - WriteOperationName(stream_, operation_name); + WriteOperationName(coded_stream_, operation_name); // Set the start timestamps. std::chrono::system_clock::time_point start_timestamp; std::tie(start_timestamp, start_steady_) = ComputeStartTimestamps( tracer_->recorder(), options.start_system_timestamp, options.start_steady_timestamp); - WriteStartTimestamp(stream_, start_timestamp); + WriteStartTimestamp(coded_stream_, start_timestamp); // Set any span references. trace_flags_ = 0; @@ -62,7 +63,7 @@ Span::Span(std::shared_ptr&& tracer, // Set tags. for (auto& tag : options.tags) { - WriteTag(stream_, tag.first, tag.second); + WriteTag(coded_stream_, tag.first, tag.second); // If sampling_priority is set, it overrides whatever sampling decision was // derived from the referenced spans. @@ -100,7 +101,7 @@ void Span::SetOperationName(opentracing::string_view name) noexcept try { if (is_finished_) { return; } - WriteOperationName(stream_, name); + WriteOperationName(coded_stream_, name); } catch (const std::exception& e) { tracer_->logger().Error("SetOperationName failed: ", e.what()); } @@ -114,7 +115,7 @@ void Span::SetTag(opentracing::string_view key, if (is_finished_) { return; } - WriteTag(stream_, key, value); + WriteTag(coded_stream_, key, value); if (key == SamplingPriorityKey) { trace_flags_ = SetTraceFlag(trace_flags_, is_sampled(value)); @@ -167,7 +168,7 @@ void Span::Log(std::initializer_list< if (is_finished_) { return; } - WriteLog(stream_, timestamp, fields.begin(), fields.end()); + WriteLog(coded_stream_, timestamp, fields.begin(), fields.end()); } catch (const std::exception& e) { tracer_->logger().Error("Log failed: ", e.what()); } @@ -213,7 +214,7 @@ bool Span::SetSpanReference( } trace_id_high = referenced_context->trace_id_high(); trace_id = referenced_context->trace_id_low(); - WriteSpanReference(stream_, reference.first, trace_id, + WriteSpanReference(coded_stream_, reference.first, trace_id, referenced_context->span_id()); trace_flags_ |= referenced_context->trace_flags(); AppendTraceState(trace_state_, referenced_context->trace_state()); @@ -246,23 +247,24 @@ void Span::FinishImpl( // Set timing information. auto duration = finish_timestamp - start_steady_; - WriteDuration(stream_, duration); + WriteDuration(coded_stream_, duration); // Set logs for (auto& log_record : options.log_records) { try { - WriteLog(stream_, log_record.timestamp, log_record.fields.data(), + WriteLog(coded_stream_, log_record.timestamp, log_record.fields.data(), log_record.fields.data() + log_record.fields.size()); } catch (const std::exception& e) { tracer_->logger().Error("Dropping log record: ", e.what()); } } - WriteSpanContext(stream_, trace_id_, span_id_, baggage_.as_vector()); + WriteSpanContext(coded_stream_, trace_id_, span_id_, baggage_.as_vector()); // Record the span - stream_.Trim(); - tracer_->recorder().RecordSpan(std::move(serialization_chain_)); + tracer_->recorder().WriteFooter(coded_stream_); + coded_stream_.Trim(); + tracer_->recorder().RecordSpan(header_fragment_, std::move(chained_stream_)); } catch (const std::exception& e) { tracer_->logger().Error("FinishWithOptions failed: ", e.what()); } diff --git a/src/tracer/span.h b/src/tracer/span.h index 275584d0..476d163b 100644 --- a/src/tracer/span.h +++ b/src/tracer/span.h @@ -5,7 +5,7 @@ #include #include -#include "common/serialization_chain.h" +#include "common/chained_stream.h" #include "common/spin_lock_mutex.h" #include "tracer/baggage_flat_map.h" #include "tracer/lightstep_span_context.h" @@ -98,8 +98,9 @@ class Span final : public opentracing::Span, public LightStepSpanContext { // more sense to use a spin lock for this use case. mutable SpinLockMutex mutex_; - std::unique_ptr serialization_chain_; - google::protobuf::io::CodedOutputStream stream_; + std::unique_ptr chained_stream_; + Fragment header_fragment_; + google::protobuf::io::CodedOutputStream coded_stream_; std::chrono::steady_clock::time_point start_steady_; std::atomic is_finished_{false}; diff --git a/src/tracer/tracer.cpp b/src/tracer/tracer.cpp index 5c259c72..6d6be12a 100644 --- a/src/tracer/tracer.cpp +++ b/src/tracer/tracer.cpp @@ -14,6 +14,7 @@ #include "lightstep/version.h" #include "recorder/auto_recorder.h" #include "recorder/grpc_transporter.h" +#include "recorder/legacy_manual_recorder.h" #include "recorder/manual_recorder.h" #include "recorder/stream_recorder.h" #include "tracer/immutable_span_context.h" @@ -134,6 +135,34 @@ static std::shared_ptr MakeStreamTracer( std::move(logger), std::move(propagation_options), std::move(recorder)}}; } +//------------------------------------------------------------------------------ +// MakeLegacySingleThreadedTracer +//------------------------------------------------------------------------------ +static std::shared_ptr MakeLegacySingleThreadedTracer( + std::shared_ptr logger, LightStepTracerOptions&& options) { + std::unique_ptr transporter; + if (options.transporter != nullptr) { + transporter = std::unique_ptr{ + dynamic_cast(options.transporter.get())}; + if (transporter == nullptr) { + logger->Error( + "`options.transporter` must be derived from AsyncTransporter " + "LegacyAsyncTransporter"); + return nullptr; + } + options.transporter.release(); + } else { + logger->Error( + "`options.transporter` must be set if `options.use_thread` is false"); + return nullptr; + } + auto propagation_options = MakePropagationOptions(options); + auto recorder = std::unique_ptr{new LegacyManualRecorder{ + *logger, std::move(options), std::move(transporter)}}; + return std::shared_ptr{new LegacyTracerImpl{ + std::move(logger), std::move(propagation_options), std::move(recorder)}}; +} + //------------------------------------------------------------------------------ // MakeSingleThreadedTracer //------------------------------------------------------------------------------ @@ -144,9 +173,7 @@ static std::shared_ptr MakeSingleThreadedTracer( transporter = std::unique_ptr{ dynamic_cast(options.transporter.get())}; if (transporter == nullptr) { - logger->Error( - "`options.transporter` must be derived from AsyncTransporter"); - return nullptr; + return MakeLegacySingleThreadedTracer(logger, std::move(options)); } options.transporter.release(); } else { @@ -157,7 +184,7 @@ static std::shared_ptr MakeSingleThreadedTracer( auto propagation_options = MakePropagationOptions(options); auto recorder = std::unique_ptr{ new ManualRecorder{*logger, std::move(options), std::move(transporter)}}; - return std::shared_ptr{new LegacyTracerImpl{ + return std::shared_ptr{new TracerImpl{ std::move(logger), std::move(propagation_options), std::move(recorder)}}; } diff --git a/test/BUILD b/test/BUILD index 9cf89b12..8375e6d4 100644 --- a/test/BUILD +++ b/test/BUILD @@ -19,8 +19,9 @@ lightstep_cc_library( "//src/common:utility_lib", "//src/network:socket_lib", "//src/common:fragment_input_stream_lib", + "//src/common:report_request_framing_lib", "//src/common:serialization_lib", - "//src/common:serialization_chain_lib", + "//src/common:chained_stream_lib", "//src/common:circular_buffer_lib", ], external_deps = [ @@ -141,3 +142,13 @@ lightstep_cc_library( "//include/lightstep:tracer_interface", ], ) + +lightstep_cc_library( + name = "composable_fragment_input_stream_wrapper_lib", + private_hdrs = [ + "composable_fragment_input_stream_wrapper.h", + ], + deps = [ + "//src/common:composable_fragment_input_stream_lib", + ], +) diff --git a/test/common/BUILD b/test/common/BUILD index 1d3a3265..22b914ce 100644 --- a/test/common/BUILD +++ b/test/common/BUILD @@ -161,6 +161,19 @@ lightstep_catch_test( ], ) +lightstep_catch_test( + name = "composable_fragment_input_stream_test", + srcs = [ + "composable_fragment_input_stream_test.cpp", + ], + deps = [ + "//src/common:composable_fragment_input_stream_lib", + "//src/common:fragment_array_input_stream_lib", + "//test:utility_lib", + "//test:composable_fragment_input_stream_wrapper_lib", + ], +) + lightstep_catch_test( name = "fragment_array_input_stream_test", srcs = [ @@ -173,12 +186,12 @@ lightstep_catch_test( ) lightstep_catch_test( - name = "serialization_chain_test", + name = "chained_stream_test", srcs = [ - "serialization_chain_test.cpp", + "chained_stream_test.cpp", ], deps = [ - "//src/common:serialization_chain_lib", + "//src/common:chained_stream_lib", "//test:utility_lib", ], ) @@ -228,3 +241,24 @@ lightstep_catch_test( "//src/common:timestamp_lib", ], ) + +lightstep_catch_test( + name = "report_request_framing_test", + srcs = [ + "report_request_framing_test.cpp", + ], + deps = [ + "//lightstep-tracer-common:collector_proto_cc", + "//src/common:report_request_framing_lib", + ], +) + +lightstep_catch_test( + name = "chunked_http_framing_test", + srcs = [ + "chunked_http_framing_test.cpp", + ], + deps = [ + "//src/common:chunked_http_framing_lib", + ], +) diff --git a/test/common/serialization_chain_test.cpp b/test/common/chained_stream_test.cpp similarity index 57% rename from test/common/serialization_chain_test.cpp rename to test/common/chained_stream_test.cpp index 914d9838..2884d259 100644 --- a/test/common/serialization_chain_test.cpp +++ b/test/common/chained_stream_test.cpp @@ -1,4 +1,4 @@ -#include "common/serialization_chain.h" +#include "common/chained_stream.h" #include #include @@ -10,66 +10,64 @@ #include "3rd_party/catch2/catch.hpp" using namespace lightstep; -TEST_CASE("SerializationChain") { - SerializationChain chain; +TEST_CASE("ChainedStream") { + ChainedStream chain; std::unique_ptr stream{ new google::protobuf::io::CodedOutputStream{&chain}}; SECTION("An empty chain has no fragments.") { stream.reset(); - chain.AddFraming(); + chain.CloseOutput(); REQUIRE(chain.num_fragments() == 0); } SECTION("We can write a string smaller than the block size.") { stream->WriteString("abc"); stream.reset(); - chain.AddFraming(); - REQUIRE(chain.num_fragments() == 3); - REQUIRE(ToString(chain) == AddSpanChunkFraming("abc")); + chain.CloseOutput(); + REQUIRE(chain.num_fragments() == 1); + REQUIRE(ToString(chain) == "abc"); } SECTION("We can write strings larger than a single block.") { - std::string s(SerializationChain::BlockSize + 1, 'X'); + std::string s(ChainedStream::BlockSize + 1, 'X'); stream->WriteString(s); stream.reset(); - chain.AddFraming(); - REQUIRE(chain.num_fragments() == 4); - REQUIRE(ToString(chain) == AddSpanChunkFraming(s)); + chain.CloseOutput(); + REQUIRE(chain.num_fragments() == 2); + REQUIRE(ToString(chain) == s); } SECTION("We can seek to any byte in the fragment stream.") { - std::string s(SerializationChain::BlockSize + 2, 'X'); + std::string s(ChainedStream::BlockSize + 2, 'X'); stream->WriteString(s); stream.reset(); - chain.AddFraming(); - std::string serialization = AddSpanChunkFraming(s); - for (size_t i = 1; i <= serialization.size(); ++i) { + chain.CloseOutput(); + for (size_t i = 1; i <= s.size(); ++i) { SECTION("cosumption instance " + std::to_string(i)) { Consume({&chain}, i); - REQUIRE(ToString(chain) == serialization.substr(i)); + REQUIRE(ToString(chain) == s.substr(i)); } } } SECTION("We can advance to any byte in the fragment stream randomly.") { - std::string s(3 * SerializationChain::BlockSize + 10, 'X'); + std::string s(3 * ChainedStream::BlockSize + 10, 'X'); stream->WriteString(s); stream.reset(); - chain.AddFraming(); - std::string serialization = AddSpanChunkFraming(s); + chain.CloseOutput(); std::mt19937 random_number_generator{0}; for (int i = 0; i < 100; ++i) { size_t num_bytes_consumed = 0; SECTION("Random advance " + std::to_string(i)) { - while (num_bytes_consumed < serialization.size()) { + while (num_bytes_consumed < s.size()) { std::uniform_int_distribution distribution{ - 1, static_cast(serialization.size()) - num_bytes_consumed}; + 1, static_cast(s.size()) - num_bytes_consumed}; auto n = distribution(random_number_generator); Consume({&chain}, n); num_bytes_consumed += n; - REQUIRE(ToString(chain) == serialization.substr(num_bytes_consumed)); + REQUIRE(ToString(chain) == s.substr(num_bytes_consumed)); } } } diff --git a/test/common/chunked_http_framing_test.cpp b/test/common/chunked_http_framing_test.cpp new file mode 100644 index 00000000..91c1aecb --- /dev/null +++ b/test/common/chunked_http_framing_test.cpp @@ -0,0 +1,11 @@ +#include "common/chunked_http_framing.h" + +#include "3rd_party/catch2/catch.hpp" +using namespace lightstep; + +TEST_CASE("ChunkedHttpFraming") { + std::string header_serialization(ChunkedHttpMaxHeaderSize + 1, ' '); + WriteHttpChunkHeader(&header_serialization[0], header_serialization.size(), + 10); + REQUIRE(header_serialization == " 0000000a\r\n"); +} diff --git a/test/common/composable_fragment_input_stream_test.cpp b/test/common/composable_fragment_input_stream_test.cpp new file mode 100644 index 00000000..19a4aef5 --- /dev/null +++ b/test/common/composable_fragment_input_stream_test.cpp @@ -0,0 +1,37 @@ +#include + +#include "3rd_party/catch2/catch.hpp" +#include "common/composable_fragment_input_stream.h" +#include "common/fragment_array_input_stream.h" +#include "test/composable_fragment_input_stream_wrapper.h" +#include "test/utility.h" +using namespace lightstep; + +TEST_CASE("ComposableFragmentInputStream") { + ComposableFragmentInputStreamWrapper stream{ + std::unique_ptr{new FragmentArrayInputStream{ + MakeFragment("abc"), MakeFragment("123")}}}; + stream.Append(std::unique_ptr{ + new ComposableFragmentInputStreamWrapper{ + std::unique_ptr{new FragmentArrayInputStream{ + MakeFragment("xyz"), MakeFragment("456")}}}}); + REQUIRE(ToString(stream) == "abc123xyz456"); + REQUIRE(stream.num_fragments() == 4); + + stream.Append(std::unique_ptr{ + new ComposableFragmentInputStreamWrapper{ + std::unique_ptr{ + new FragmentArrayInputStream{MakeFragment("qrz")}}}}); + REQUIRE(ToString(stream) == "abc123xyz456qrz"); + REQUIRE(stream.num_fragments() == 5); + + SECTION("We can consume from ComposableFragmentInputStream") { + auto contents = ToString(stream); + for (size_t i = 0; i < contents.size(); ++i) { + SECTION("Consume " + std::to_string(i)) { + Consume({&stream}, i); + REQUIRE(ToString(stream) == contents.substr(i)); + } + } + } +} diff --git a/test/common/report_request_framing_test.cpp b/test/common/report_request_framing_test.cpp new file mode 100644 index 00000000..eba61057 --- /dev/null +++ b/test/common/report_request_framing_test.cpp @@ -0,0 +1,31 @@ +#include "common/report_request_framing.h" + +#include "lightstep-tracer-common/collector.pb.h" + +#include "3rd_party/catch2/catch.hpp" +using namespace lightstep; + +TEST_CASE("ReportRequestFraming") { + std::string header_serialization(ReportRequestSpansMaxHeaderSize, ' '); + + SECTION("We can successfully parse out serialized spans") { + collector::ReportRequest report; + collector::Span span; + span.mutable_span_context()->set_trace_id(123); + auto s = span.SerializeAsString(); + auto header_size = WriteReportRequestSpansHeader( + &header_serialization[0], header_serialization.size(), s.size()); + s = header_serialization.substr(header_serialization.size() - header_size) + + s; + REQUIRE(report.ParseFromString(s)); + REQUIRE(report.spans().size() == 1); + REQUIRE(report.spans()[0].span_context().trace_id() == 123); + } + + SECTION("We can serialize the largest header") { + REQUIRE(WriteReportRequestSpansHeader( + &header_serialization[0], header_serialization.size(), + std::numeric_limits::max()) == + header_serialization.size()); + } +} diff --git a/test/composable_fragment_input_stream_wrapper.h b/test/composable_fragment_input_stream_wrapper.h new file mode 100644 index 00000000..dd8ec429 --- /dev/null +++ b/test/composable_fragment_input_stream_wrapper.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +#include "common/composable_fragment_input_stream.h" + +namespace lightstep { +/** + * Wraps a FragmentInputStream to make it composable for testing. + */ +class ComposableFragmentInputStreamWrapper final + : public ComposableFragmentInputStream { + public: + explicit ComposableFragmentInputStreamWrapper( + std::unique_ptr&& stream) noexcept + : stream_{std::move(stream)} {} + + // ComposableFragmentInputStream + int segment_num_fragments() const noexcept override { + return stream_->num_fragments(); + } + + bool SegmentForEachFragment(Callback callback) const noexcept override { + return stream_->ForEachFragment(callback); + } + + void SegmentClear() noexcept override { return stream_->Clear(); } + + void SegmentSeek(int fragment_index, int position) noexcept override { + return stream_->Seek(fragment_index, position); + } + + private: + std::unique_ptr stream_; +}; +} // namespace lightstep diff --git a/test/number_simulation.cpp b/test/number_simulation.cpp index c135da8c..06e57259 100644 --- a/test/number_simulation.cpp +++ b/test/number_simulation.cpp @@ -46,14 +46,14 @@ GenerateRandomBinaryNumber(size_t max_digits) { //-------------------------------------------------------------------------------------------------- // GenerateRandomBinaryNumbers //-------------------------------------------------------------------------------------------------- -static void GenerateRandomBinaryNumbers( - CircularBuffer& buffer, std::vector& numbers, - size_t n) { +static void GenerateRandomBinaryNumbers(CircularBuffer& buffer, + std::vector& numbers, + size_t n) { while (n-- != 0) { uint32_t x; opentracing::string_view s; std::tie(x, s) = GenerateRandomBinaryNumber(32); - if (AddString(buffer, s)) { + if (AddSpanChunkFramedString(buffer, s)) { numbers.push_back(x); } } @@ -163,7 +163,7 @@ static bool HasPendingData(ConnectionStream& connection_stream) { //-------------------------------------------------------------------------------------------------- // RunBinaryNumberProducer //-------------------------------------------------------------------------------------------------- -void RunBinaryNumberProducer(CircularBuffer& buffer, +void RunBinaryNumberProducer(CircularBuffer& buffer, std::vector& numbers, size_t num_threads, size_t n) { std::vector> thread_numbers(num_threads); diff --git a/test/number_simulation.h b/test/number_simulation.h index 22176868..fe8a46ca 100644 --- a/test/number_simulation.h +++ b/test/number_simulation.h @@ -6,8 +6,8 @@ #include #include +#include "common/chained_stream.h" #include "common/circular_buffer.h" -#include "common/serialization_chain.h" #include "recorder/stream_recorder/connection_stream.h" #include @@ -20,7 +20,7 @@ namespace lightstep { * @param num_threads the number of threads to write numbers on. * @param n the number of numbers to write. */ -void RunBinaryNumberProducer(CircularBuffer& buffer, +void RunBinaryNumberProducer(CircularBuffer& buffer, std::vector& numbers, size_t num_threads, size_t n); diff --git a/test/recorder/BUILD b/test/recorder/BUILD index 350f3581..229e5409 100644 --- a/test/recorder/BUILD +++ b/test/recorder/BUILD @@ -16,11 +16,26 @@ lightstep_cc_library( "in_memory_recorder.cpp", ], deps = [ + "//src/common:buffer_chain_lib", "//src/recorder:recorder_interface", "//test:utility_lib", + "//lightstep-tracer-common:collector_proto_cc", ], ) +lightstep_cc_library( + name = "legacy_in_memory_async_transporter_lib", + private_hdrs = [ + "legacy_in_memory_async_transporter.h", + ], + srcs = [ + "legacy_in_memory_async_transporter.cpp", + ], + deps = [ + "//src/recorder:transporter_lib", + ], + +) lightstep_cc_library( name = "in_memory_async_transporter_lib", private_hdrs = [ @@ -30,6 +45,7 @@ lightstep_cc_library( "in_memory_async_transporter.cpp", ], deps = [ + "//src/common:buffer_chain_lib", "//src/recorder:transporter_lib", ], ) @@ -47,6 +63,20 @@ lightstep_cc_library( ], ) +lightstep_catch_test( + name = "legacy_manual_recorder_test", + srcs = [ + "legacy_manual_recorder_test.cpp", + ], + deps = [ + "//:manual_tracer_lib", + "//src/tracer:counting_metrics_observer_lib", + "//test:utility_lib", + "//test:testing_condition_variable_wrapper_lib", + ":legacy_in_memory_async_transporter_lib", + ], +) + lightstep_catch_test( name = "manual_recorder_test", srcs = [ diff --git a/test/recorder/in_memory_async_transporter.cpp b/test/recorder/in_memory_async_transporter.cpp index 32e3b492..6311ed14 100644 --- a/test/recorder/in_memory_async_transporter.cpp +++ b/test/recorder/in_memory_async_transporter.cpp @@ -1,55 +1,66 @@ -#include "in_memory_async_transporter.h" +#include "test/recorder/in_memory_async_transporter.h" + +#include +#include +#include namespace lightstep { -//------------------------------------------------------------------------------ -// Send -//------------------------------------------------------------------------------ -void InMemoryAsyncTransporter::Send(const google::protobuf::Message& request, - google::protobuf::Message& response, - AsyncTransporter::Callback& callback) { - active_request_ = &request; - active_response_ = &response; - active_callback_ = &callback; -} +//-------------------------------------------------------------------------------------------------- +// constructor +//-------------------------------------------------------------------------------------------------- +InMemoryAsyncTransporter::InMemoryAsyncTransporter( + std::function on_span_buffer_full) + : on_span_buffer_full_{std::move(on_span_buffer_full)} {} -//------------------------------------------------------------------------------ -// Write -//------------------------------------------------------------------------------ -void InMemoryAsyncTransporter::Write() { - if (active_request_ == nullptr || active_response_ == nullptr || - active_callback_ == nullptr) { - std::cerr << "No context, success callback, or request\n"; +//-------------------------------------------------------------------------------------------------- +// Succeed +//-------------------------------------------------------------------------------------------------- +void InMemoryAsyncTransporter::Succeed() noexcept { + if (active_callback_ == nullptr || active_message_ == nullptr) { std::terminate(); } - const auto& report = - dynamic_cast(*active_request_); - reports_.push_back(report); - - spans_.reserve(spans_.size() + report.spans_size()); - for (auto& span : report.spans()) { - spans_.push_back(span); + std::string s(active_message_->num_bytes(), ' '); + active_message_->CopyOut(&s[0], s.size()); + collector::ReportRequest report; + if (!report.ParseFromString(s)) { + std::terminate(); } + reports_.emplace_back(report); + std::copy(report.spans().begin(), report.spans().end(), + std::back_inserter(spans_)); - active_response_->CopyFrom(*Transporter::MakeCollectorResponse()); - if (should_disable_) { - collector::Command command; - command.set_disable(true); - auto& report_response = - dynamic_cast(*active_response_); - *report_response.add_commands() = command; - } - active_callback_->OnSuccess(); + active_callback_->OnSuccess(*active_message_); + active_callback_ = nullptr; + active_message_.reset(); } -//------------------------------------------------------------------------------ +//-------------------------------------------------------------------------------------------------- // Fail -//------------------------------------------------------------------------------ -void InMemoryAsyncTransporter::Fail(std::error_code error) { - if (active_callback_ == nullptr) { - std::cerr << "No context or failure callback\n"; +//-------------------------------------------------------------------------------------------------- +void InMemoryAsyncTransporter::Fail() noexcept { + if (active_callback_ == nullptr || active_message_ == nullptr) { std::terminate(); } + active_callback_->OnFailure(*active_message_); + active_callback_ = nullptr; + active_message_.reset(); +} + +//-------------------------------------------------------------------------------------------------- +// OnSpanBufferFull +//-------------------------------------------------------------------------------------------------- +void InMemoryAsyncTransporter::OnSpanBufferFull() noexcept { + if (on_span_buffer_full_) { + on_span_buffer_full_(); + } +} - active_callback_->OnFailure(error); +//-------------------------------------------------------------------------------------------------- +// Send +//-------------------------------------------------------------------------------------------------- +void InMemoryAsyncTransporter::Send(std::unique_ptr&& message, + Callback& callback) noexcept { + active_callback_ = &callback; + active_message_ = std::move(message); } } // namespace lightstep diff --git a/test/recorder/in_memory_async_transporter.h b/test/recorder/in_memory_async_transporter.h index 5c4aafb6..7c04f2aa 100644 --- a/test/recorder/in_memory_async_transporter.h +++ b/test/recorder/in_memory_async_transporter.h @@ -1,38 +1,55 @@ #pragma once -#include -#include -#include +#include #include #include "lightstep-tracer-common/collector.pb.h" #include "lightstep/transporter.h" namespace lightstep { -class InMemoryAsyncTransporter : public AsyncTransporter { +/** + * An AsyncTransporter that "transports" spans in to a container. + */ +class InMemoryAsyncTransporter final : public AsyncTransporter { public: - void Send(const google::protobuf::Message& request, - google::protobuf::Message& response, - AsyncTransporter::Callback& callback) override; + explicit InMemoryAsyncTransporter( + std::function on_span_buffer_full = {}); - void Write(); - - void Fail(std::error_code error); - - const std::vector& reports() const { + /** + * @return the ReportRequests that were transported. + */ + const std::vector& reports() const noexcept { return reports_; } - const std::vector& spans() const { return spans_; } + /** + * All the spans that were transported. + */ + const std::vector& spans() const noexcept { return spans_; } + + /** + * Make the last Send succeed. + */ + void Succeed() noexcept; + + /** + * Make the last Send fail. + */ + void Fail() noexcept; - void set_should_disable(bool value) { should_disable_ = value; } + // AsyncTranspoter + void OnSpanBufferFull() noexcept override; + + void Send(std::unique_ptr&& message, + Callback& callback) noexcept override; private: - bool should_disable_ = false; - const google::protobuf::Message* active_request_; - google::protobuf::Message* active_response_; - AsyncTransporter::Callback* active_callback_; + std::function on_span_buffer_full_; + std::vector reports_; std::vector spans_; + + Callback* active_callback_{nullptr}; + std::unique_ptr active_message_; }; } // namespace lightstep diff --git a/test/recorder/in_memory_recorder.cpp b/test/recorder/in_memory_recorder.cpp index d8ad3ec3..c0a696c8 100644 --- a/test/recorder/in_memory_recorder.cpp +++ b/test/recorder/in_memory_recorder.cpp @@ -43,24 +43,16 @@ void InMemoryRecorder::RecordSpan(const collector::Span& span) noexcept { } void InMemoryRecorder::RecordSpan( - std::unique_ptr&& span) noexcept { - span->AddFraming(); + Fragment /*header_fragment*/, + std::unique_ptr&& span) noexcept { + span->CloseOutput(); auto serialization = ToString(*span); - auto header_last = - std::find(serialization.begin(), serialization.end(), '\n') + 1; - auto message_size = serialization.size() - - std::distance(serialization.begin(), header_last) - 2; - collector::ReportRequest request; - if (!request.ParseFromArray(static_cast(&*header_last), - message_size)) { + collector::Span protobuf_span; + if (!protobuf_span.ParseFromString(serialization)) { std::cerr << "Failed to parse span\n"; std::terminate(); } - if (request.spans().size() != 1) { - std::cerr << "No span found\n"; - std::terminate(); - } std::lock_guard lock_guard{mutex_}; - spans_.emplace_back(std::move(request.spans()[0])); + spans_.emplace_back(std::move(protobuf_span)); } } // namespace lightstep diff --git a/test/recorder/in_memory_recorder.h b/test/recorder/in_memory_recorder.h index e58e7bb6..1fedc852 100644 --- a/test/recorder/in_memory_recorder.h +++ b/test/recorder/in_memory_recorder.h @@ -18,7 +18,8 @@ class InMemoryRecorder final : public Recorder { // Recorder void RecordSpan(const collector::Span& span) noexcept override; - void RecordSpan(std::unique_ptr&& span) noexcept override; + void RecordSpan(Fragment header_fragment, + std::unique_ptr&& span) noexcept override; private: mutable std::mutex mutex_; diff --git a/test/recorder/legacy_in_memory_async_transporter.cpp b/test/recorder/legacy_in_memory_async_transporter.cpp new file mode 100644 index 00000000..3443b443 --- /dev/null +++ b/test/recorder/legacy_in_memory_async_transporter.cpp @@ -0,0 +1,56 @@ +#include "legacy_in_memory_async_transporter.h" + +namespace lightstep { +//------------------------------------------------------------------------------ +// Send +//------------------------------------------------------------------------------ +void LegacyInMemoryAsyncTransporter::Send( + const google::protobuf::Message& request, + google::protobuf::Message& response, + LegacyAsyncTransporter::Callback& callback) { + active_request_ = &request; + active_response_ = &response; + active_callback_ = &callback; +} + +//------------------------------------------------------------------------------ +// Write +//------------------------------------------------------------------------------ +void LegacyInMemoryAsyncTransporter::Write() { + if (active_request_ == nullptr || active_response_ == nullptr || + active_callback_ == nullptr) { + std::cerr << "No context, success callback, or request\n"; + std::terminate(); + } + const auto& report = + dynamic_cast(*active_request_); + reports_.push_back(report); + + spans_.reserve(spans_.size() + report.spans_size()); + for (auto& span : report.spans()) { + spans_.push_back(span); + } + + active_response_->CopyFrom(*Transporter::MakeCollectorResponse()); + if (should_disable_) { + collector::Command command; + command.set_disable(true); + auto& report_response = + dynamic_cast(*active_response_); + *report_response.add_commands() = command; + } + active_callback_->OnSuccess(); +} + +//------------------------------------------------------------------------------ +// Fail +//------------------------------------------------------------------------------ +void LegacyInMemoryAsyncTransporter::Fail(std::error_code error) { + if (active_callback_ == nullptr) { + std::cerr << "No context or failure callback\n"; + std::terminate(); + } + + active_callback_->OnFailure(error); +} +} // namespace lightstep diff --git a/test/recorder/legacy_in_memory_async_transporter.h b/test/recorder/legacy_in_memory_async_transporter.h new file mode 100644 index 00000000..f1d6f26b --- /dev/null +++ b/test/recorder/legacy_in_memory_async_transporter.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include + +#include "lightstep-tracer-common/collector.pb.h" +#include "lightstep/transporter.h" + +namespace lightstep { +class LegacyInMemoryAsyncTransporter final : public LegacyAsyncTransporter { + public: + void Write(); + + void Fail(std::error_code error); + + const std::vector& reports() const { + return reports_; + } + + const std::vector& spans() const { return spans_; } + + void set_should_disable(bool value) { should_disable_ = value; } + + // LegacyAsyncTransporter + void Send(const google::protobuf::Message& request, + google::protobuf::Message& response, + LegacyAsyncTransporter::Callback& callback) override; + + private: + bool should_disable_ = false; + const google::protobuf::Message* active_request_; + google::protobuf::Message* active_response_; + LegacyAsyncTransporter::Callback* active_callback_; + std::vector reports_; + std::vector spans_; +}; +} // namespace lightstep diff --git a/test/recorder/legacy_manual_recorder_test.cpp b/test/recorder/legacy_manual_recorder_test.cpp new file mode 100644 index 00000000..9595ec94 --- /dev/null +++ b/test/recorder/legacy_manual_recorder_test.cpp @@ -0,0 +1,145 @@ +#include "recorder/legacy_manual_recorder.h" + +#include + +#include "3rd_party/catch2/catch.hpp" +#include "lightstep/tracer.h" +#include "test/recorder/legacy_in_memory_async_transporter.h" +#include "test/utility.h" +#include "tracer/counting_metrics_observer.h" +#include "tracer/legacy/legacy_tracer_impl.h" + +using namespace lightstep; +using namespace opentracing; + +TEST_CASE("LegacyManualRecorder") { + Logger logger{}; + auto metrics_observer = new CountingMetricsObserver{}; + LightStepTracerOptions options; + size_t max_buffered_spans{5}; + options.max_buffered_spans = + std::function{[&] { return max_buffered_spans; }}; + options.metrics_observer.reset(metrics_observer); + auto in_memory_transporter = new LegacyInMemoryAsyncTransporter{}; + auto recorder = new LegacyManualRecorder{ + logger, std::move(options), + std::unique_ptr{in_memory_transporter}}; + auto tracer = std::shared_ptr{new LegacyTracerImpl{ + PropagationOptions{}, std::unique_ptr{recorder}}}; + CHECK(tracer); + + SECTION("Buffered spans get transported after Flush is manually called.") { + auto span = tracer->StartSpan("abc"); + CHECK(span); + span->Finish(); + CHECK(in_memory_transporter->reports().empty()); + CHECK(tracer->Flush()); + in_memory_transporter->Write(); + CHECK(in_memory_transporter->reports().size() == 1); + } + + SECTION("Flush fails if a report is already being sent.") { + auto span1 = tracer->StartSpan("abc"); + CHECK(span1); + span1->Finish(); + CHECK(tracer->Flush()); + auto span2 = tracer->StartSpan("xyz"); + CHECK(span2); + span2->Finish(); + CHECK(!tracer->Flush()); + } + + SECTION( + "If the tranporter fails, it's spans are reported as dropped in the " + "following report.") { + logger.set_level(LogLevel::off); + auto span1 = tracer->StartSpan("abc"); + CHECK(span1); + span1->Finish(); + CHECK(tracer->Flush()); + in_memory_transporter->Fail( + std::make_error_code(std::errc::network_unreachable)); + + auto span2 = tracer->StartSpan("xyz"); + CHECK(span2); + span2->Finish(); + CHECK(tracer->Flush()); + in_memory_transporter->Write(); + CHECK(LookupSpansDropped(in_memory_transporter->reports().at(0)) == 1); + } + + SECTION( + "If a collector sends back a disable command, then the tracer stops " + "sending reports") { + in_memory_transporter->set_should_disable(true); + + auto span = tracer->StartSpan("abc"); + span->Finish(); + CHECK(tracer->Flush()); + in_memory_transporter->Write(); + + span = tracer->StartSpan("xyz"); + CHECK(span); + span->Finish(); + CHECK(!tracer->Flush()); + } + + SECTION("Flush is called when the tracer's buffer is filled.") { + for (size_t i = 0; i < max_buffered_spans; ++i) { + auto span = tracer->StartSpan("abc"); + CHECK(span); + span->Finish(); + } + in_memory_transporter->Write(); + CHECK(in_memory_transporter->reports().size() == 1); + } + + SECTION( + "MetricsObserver::OnFlush gets called whenever the recorder is " + "successfully flushed.") { + auto span = tracer->StartSpan("abc"); + span->Finish(); + tracer->Flush(); + in_memory_transporter->Write(); + CHECK(metrics_observer->num_flushes == 1); + } + + SECTION( + "MetricsObserver::OnSpansSent gets called with the number of spans " + "transported") { + auto span1 = tracer->StartSpan("abc"); + span1->Finish(); + auto span2 = tracer->StartSpan("abc"); + span2->Finish(); + tracer->Flush(); + in_memory_transporter->Write(); + CHECK(metrics_observer->num_spans_sent == 2); + } + + SECTION( + "MetricsObserver::OnSpansDropped gets called when spans are dropped.") { + logger.set_level(LogLevel::off); + auto span1 = tracer->StartSpan("abc"); + span1->Finish(); + auto span2 = tracer->StartSpan("abc"); + span2->Finish(); + tracer->Flush(); + in_memory_transporter->Fail( + std::make_error_code(std::errc::network_unreachable)); + CHECK(metrics_observer->num_spans_sent == 2); + CHECK(metrics_observer->num_spans_dropped == 2); + } + + SECTION( + "If `max_buffered_spans` is dynamically changed, it takes whenever the " + "recorder is next invoked.") { + max_buffered_spans -= 1; + for (size_t i = 0; i < max_buffered_spans; ++i) { + auto span = tracer->StartSpan("abc"); + CHECK(span); + span->Finish(); + } + in_memory_transporter->Write(); + CHECK(in_memory_transporter->reports().size() == 1); + } +} diff --git a/test/recorder/manual_recorder_test.cpp b/test/recorder/manual_recorder_test.cpp index 8ce992e1..acf43660 100644 --- a/test/recorder/manual_recorder_test.cpp +++ b/test/recorder/manual_recorder_test.cpp @@ -1,18 +1,14 @@ -#include +#include "recorder/manual_recorder.h" #include "3rd_party/catch2/catch.hpp" #include "lightstep/tracer.h" -#include "recorder/manual_recorder.h" #include "test/recorder/in_memory_async_transporter.h" -#include "test/testing_condition_variable_wrapper.h" #include "test/utility.h" #include "tracer/counting_metrics_observer.h" -#include "tracer/legacy/legacy_tracer_impl.h" - +#include "tracer/tracer_impl.h" using namespace lightstep; -using namespace opentracing; -TEST_CASE("manual_recorder") { +TEST_CASE("ManualRecorder") { Logger logger{}; auto metrics_observer = new CountingMetricsObserver{}; LightStepTracerOptions options; @@ -20,13 +16,21 @@ TEST_CASE("manual_recorder") { options.max_buffered_spans = std::function{[&] { return max_buffered_spans; }}; options.metrics_observer.reset(metrics_observer); - auto in_memory_transporter = new InMemoryAsyncTransporter{}; + std::shared_ptr tracer; + bool flush_on_full = true; + auto on_span_buffer_full = [&] { + if (flush_on_full) { + tracer->Flush(); + } + }; + auto in_memory_transporter = + new InMemoryAsyncTransporter{on_span_buffer_full}; auto recorder = new ManualRecorder{ logger, std::move(options), std::unique_ptr{in_memory_transporter}}; - auto tracer = std::shared_ptr{new LegacyTracerImpl{ + tracer = std::shared_ptr{new TracerImpl{ PropagationOptions{}, std::unique_ptr{recorder}}}; - CHECK(tracer); + REQUIRE(tracer); SECTION("Buffered spans get transported after Flush is manually called.") { auto span = tracer->StartSpan("abc"); @@ -34,21 +38,10 @@ TEST_CASE("manual_recorder") { span->Finish(); CHECK(in_memory_transporter->reports().empty()); CHECK(tracer->Flush()); - in_memory_transporter->Write(); + in_memory_transporter->Succeed(); CHECK(in_memory_transporter->reports().size() == 1); } - SECTION("Flush fails if a report is already being sent.") { - auto span1 = tracer->StartSpan("abc"); - CHECK(span1); - span1->Finish(); - CHECK(tracer->Flush()); - auto span2 = tracer->StartSpan("xyz"); - CHECK(span2); - span2->Finish(); - CHECK(!tracer->Flush()); - } - SECTION( "If the tranporter fails, it's spans are reported as dropped in the " "following report.") { @@ -57,50 +50,43 @@ TEST_CASE("manual_recorder") { CHECK(span1); span1->Finish(); CHECK(tracer->Flush()); - in_memory_transporter->Fail( - std::make_error_code(std::errc::network_unreachable)); + in_memory_transporter->Fail(); auto span2 = tracer->StartSpan("xyz"); CHECK(span2); span2->Finish(); CHECK(tracer->Flush()); - in_memory_transporter->Write(); + in_memory_transporter->Succeed(); CHECK(LookupSpansDropped(in_memory_transporter->reports().at(0)) == 1); } - SECTION( - "If a collector sends back a disable command, then the tracer stops " - "sending reports") { - in_memory_transporter->set_should_disable(true); - - auto span = tracer->StartSpan("abc"); - span->Finish(); - CHECK(tracer->Flush()); - in_memory_transporter->Write(); - - span = tracer->StartSpan("xyz"); - CHECK(span); - span->Finish(); - CHECK(!tracer->Flush()); - } - SECTION("Flush is called when the tracer's buffer is filled.") { - for (size_t i = 0; i < max_buffered_spans; ++i) { + for (size_t i = 0; i < max_buffered_spans + 1; ++i) { auto span = tracer->StartSpan("abc"); CHECK(span); span->Finish(); } - in_memory_transporter->Write(); + in_memory_transporter->Succeed(); CHECK(in_memory_transporter->reports().size() == 1); } + SECTION("If we don't flush when the span buffer fills, we'll drop the span") { + flush_on_full = false; + for (size_t i = 0; i < max_buffered_spans + 1; ++i) { + auto span = tracer->StartSpan("abc"); + CHECK(span); + span->Finish(); + } + REQUIRE(metrics_observer->num_spans_dropped == 1); + } + SECTION( "MetricsObserver::OnFlush gets called whenever the recorder is " "successfully flushed.") { auto span = tracer->StartSpan("abc"); span->Finish(); tracer->Flush(); - in_memory_transporter->Write(); + in_memory_transporter->Succeed(); CHECK(metrics_observer->num_flushes == 1); } @@ -112,7 +98,7 @@ TEST_CASE("manual_recorder") { auto span2 = tracer->StartSpan("abc"); span2->Finish(); tracer->Flush(); - in_memory_transporter->Write(); + in_memory_transporter->Succeed(); CHECK(metrics_observer->num_spans_sent == 2); } @@ -124,22 +110,8 @@ TEST_CASE("manual_recorder") { auto span2 = tracer->StartSpan("abc"); span2->Finish(); tracer->Flush(); - in_memory_transporter->Fail( - std::make_error_code(std::errc::network_unreachable)); - CHECK(metrics_observer->num_spans_sent == 2); + in_memory_transporter->Fail(); + CHECK(metrics_observer->num_spans_sent == 0); CHECK(metrics_observer->num_spans_dropped == 2); } - - SECTION( - "If `max_buffered_spans` is dynamically changed, it takes whenever the " - "recorder is next invoked.") { - max_buffered_spans -= 1; - for (size_t i = 0; i < max_buffered_spans; ++i) { - auto span = tracer->StartSpan("abc"); - CHECK(span); - span->Finish(); - } - in_memory_transporter->Write(); - CHECK(in_memory_transporter->reports().size() == 1); - } } diff --git a/test/recorder/serialization/BUILD b/test/recorder/serialization/BUILD new file mode 100644 index 00000000..1ced455e --- /dev/null +++ b/test/recorder/serialization/BUILD @@ -0,0 +1,40 @@ +load( + "//bazel:lightstep_build_system.bzl", + "lightstep_catch_test", + "lightstep_cc_library", + "lightstep_package", +) + +lightstep_package() + +lightstep_catch_test( + name = "report_request_header_test", + srcs = [ + "report_request_header_test.cpp", + ], + deps = [ + "//src/recorder/serialization:report_request_header_lib", + ], +) + +lightstep_catch_test( + name = "embedded_metrics_message_test", + srcs = [ + "embedded_metrics_message_test.cpp", + ], + deps = [ + "//src/recorder/serialization:embedded_metrics_message_lib", + ], +) + +lightstep_catch_test( + name = "report_request_test", + srcs = [ + "report_request_test.cpp", + ], + deps = [ + "//src/common:report_request_framing_lib", + "//src/recorder/serialization:report_request_lib", + "//src/recorder/serialization:report_request_header_lib", + ], +) diff --git a/test/recorder/stream_recorder/embedded_metrics_message_test.cpp b/test/recorder/serialization/embedded_metrics_message_test.cpp similarity index 92% rename from test/recorder/stream_recorder/embedded_metrics_message_test.cpp rename to test/recorder/serialization/embedded_metrics_message_test.cpp index 8a974197..31458114 100644 --- a/test/recorder/stream_recorder/embedded_metrics_message_test.cpp +++ b/test/recorder/serialization/embedded_metrics_message_test.cpp @@ -1,4 +1,4 @@ -#include "recorder/stream_recorder/embedded_metrics_message.h" +#include "recorder/serialization/embedded_metrics_message.h" #include "3rd_party/catch2/catch.hpp" using namespace lightstep; diff --git a/test/recorder/serialization/report_request_header_test.cpp b/test/recorder/serialization/report_request_header_test.cpp new file mode 100644 index 00000000..3ade4066 --- /dev/null +++ b/test/recorder/serialization/report_request_header_test.cpp @@ -0,0 +1,20 @@ +#include "recorder/serialization/report_request_header.h" + +#include "3rd_party/catch2/catch.hpp" +#include "lightstep-tracer-common/collector.pb.h" +using namespace lightstep; + +TEST_CASE("WriteReportRequestHeader") { + LightStepTracerOptions tracer_options; + tracer_options.access_token = "abc123"; + tracer_options.tags = {{"xyz", 456}}; + auto serialization = WriteReportRequestHeader(tracer_options, 789); + collector::ReportRequest report; + REQUIRE(report.ParseFromString(serialization)); + REQUIRE(report.auth().access_token() == "abc123"); + REQUIRE(report.reporter().reporter_id() == 789); + auto& tags = report.reporter().tags(); + REQUIRE(tags.size() == 1); + REQUIRE(tags[0].key() == "xyz"); + REQUIRE(tags[0].int_value() == 456); +} diff --git a/test/recorder/serialization/report_request_test.cpp b/test/recorder/serialization/report_request_test.cpp new file mode 100644 index 00000000..f9a40d1c --- /dev/null +++ b/test/recorder/serialization/report_request_test.cpp @@ -0,0 +1,96 @@ +#include "recorder/serialization/report_request.h" + +#include + +#include "3rd_party/catch2/catch.hpp" +#include "common/report_request_framing.h" +#include "lightstep-tracer-common/collector.pb.h" +#include "recorder/serialization/report_request_header.h" + +#include +using namespace lightstep; + +static collector::ReportRequest ToProtobufReportRequest( + const ReportRequest& report_request) { + std::string s(report_request.num_bytes(), ' '); + report_request.CopyOut(&s[0], s.size()); + collector::ReportRequest result; + if (!result.ParseFromString(s)) { + std::cerr << "Failed to parse report request\n"; + std::terminate(); + } + return result; +} + +static std::unique_ptr ToSerialization( + const collector::Span& span) { + auto s = span.SerializeAsString(); + std::array buffer; + auto header_size = WriteReportRequestSpansHeader( + buffer.data(), buffer.size(), static_cast(s.size())); + s.insert(0, buffer.data() + (buffer.size() - header_size), header_size); + + std::unique_ptr result{new ChainedStream{}}; + std::unique_ptr stream{ + new google::protobuf::io::CodedOutputStream{result.get()}}; + stream->WriteRaw(static_cast(s.data()), + static_cast(s.size())); + stream.reset(); + result->CloseOutput(); + + return result; +} + +TEST_CASE("ReportRequest") { + LightStepTracerOptions options; + uint64_t reporter_id = 123; + auto header = std::make_shared( + WriteReportRequestHeader(options, reporter_id)); + ReportRequest report_request{header, 0}; + REQUIRE(report_request.num_dropped_spans() == 0); + REQUIRE(report_request.num_spans() == 0); + + SECTION("We can serialize a ReportRequest with no spans") { + auto protobuf_report_request = ToProtobufReportRequest(report_request); + REQUIRE(protobuf_report_request.reporter().reporter_id() == 123); + } + + SECTION("We can add metrics to a ReportRequest") { + report_request = ReportRequest{header, 42}; + REQUIRE(report_request.num_dropped_spans() == 42); + auto protobuf_report_request = ToProtobufReportRequest(report_request); + auto& dropped_span_count = + protobuf_report_request.internal_metrics().counts()[0]; + REQUIRE(dropped_span_count.name() == "spans.dropped"); + REQUIRE(dropped_span_count.int_value() == 42); + } + + SECTION("We can add a span to the ReportRequest") { + collector::Span span; + span.mutable_span_context()->set_trace_id(1); + report_request.AddSpan(ToSerialization(span)); + + REQUIRE(report_request.num_spans() == 1); + + auto protobuf_report_request = ToProtobufReportRequest(report_request); + REQUIRE(protobuf_report_request.spans().size() == 1); + REQUIRE(protobuf_report_request.spans()[0].span_context().trace_id() == 1); + } + + SECTION("We can add mulltiple spans to the ReportRequest") { + collector::Span span1; + span1.mutable_span_context()->set_trace_id(1); + report_request.AddSpan(ToSerialization(span1)); + + collector::Span span2; + span2.mutable_span_context()->set_trace_id(2); + report_request.AddSpan(ToSerialization(span2)); + + REQUIRE(report_request.num_spans() == 2); + + auto protobuf_report_request = ToProtobufReportRequest(report_request); + REQUIRE(protobuf_report_request.spans().size() == 2); + REQUIRE(protobuf_report_request.spans()[0].span_context().trace_id() == 1); + REQUIRE(protobuf_report_request.spans()[1].span_context().trace_id() == 2); + } +} diff --git a/test/recorder/stream_recorder/BUILD b/test/recorder/stream_recorder/BUILD index 0058c137..e0d66f05 100644 --- a/test/recorder/stream_recorder/BUILD +++ b/test/recorder/stream_recorder/BUILD @@ -75,16 +75,6 @@ lightstep_catch_test( ], ) -lightstep_catch_test( - name = "embedded_metrics_message_test", - srcs = [ - "embedded_metrics_message_test.cpp", - ], - deps = [ - "//src/recorder/stream_recorder:embedded_metrics_message_lib", - ], -) - lightstep_catch_test( name = "span_stream_test", srcs = [ @@ -103,7 +93,7 @@ lightstep_catch_test( ], deps = [ "//src/recorder/stream_recorder:connection_stream_lib", - "//src/recorder/stream_recorder:utility_lib", + "//src/recorder/serialization:report_request_header_lib", "//test:number_simulation_lib", "//test:utility_lib", ], diff --git a/test/recorder/stream_recorder/connection_stream_test.cpp b/test/recorder/stream_recorder/connection_stream_test.cpp index 6442eb5e..e64c308e 100644 --- a/test/recorder/stream_recorder/connection_stream_test.cpp +++ b/test/recorder/stream_recorder/connection_stream_test.cpp @@ -1,8 +1,8 @@ #include "recorder/stream_recorder/connection_stream.h" #include "3rd_party/catch2/catch.hpp" +#include "recorder/serialization/report_request_header.h" #include "recorder/stream_recorder/span_stream.h" -#include "recorder/stream_recorder/utility.h" #include "test/number_simulation.h" #include "test/utility.h" using namespace lightstep; @@ -33,12 +33,12 @@ static collector::ReportRequest ParseStreamHeader(std::string& s) { TEST_CASE("ConnectionStream") { LightStepTracerOptions tracer_options; - CircularBuffer span_buffer{1000}; + CircularBuffer span_buffer{1000}; MetricsObserver metrics_observer; - StreamRecorderMetrics metrics{metrics_observer}; + MetricsTracker metrics{metrics_observer}; SpanStream span_stream{span_buffer, metrics}; std::string header_common_fragment = - WriteStreamHeaderCommonFragment(tracer_options, 123); + WriteReportRequestHeader(tracer_options, 123); auto host_header_fragment = MakeFragment("Host:abc\r\n"); ConnectionStream connection_stream{ host_header_fragment, @@ -144,7 +144,7 @@ TEST_CASE("ConnectionStream") { SECTION( "After writing the header, ConnectionStream writes the contents of the " "span buffer.") { - AddString(span_buffer, "abc"); + AddSpanChunkFramedString(span_buffer, "abc"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -162,14 +162,14 @@ TEST_CASE("ConnectionStream") { contents = ToString(fragment_streams); return Consume(fragment_streams, static_cast(contents.size())); }); - AddString(span_buffer, "abc"); + AddSpanChunkFramedString(span_buffer, "abc"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { contents = ToString(fragment_streams); return Consume(fragment_streams, 4); }); - AddString(span_buffer, "123"); + AddSpanChunkFramedString(span_buffer, "123"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -190,7 +190,7 @@ TEST_CASE("ConnectionStream") { contents = ToString(fragment_streams); return Consume(fragment_streams, static_cast(contents.size())); }); - AddString(span_buffer, "abc"); + AddSpanChunkFramedString(span_buffer, "abc"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -198,7 +198,7 @@ TEST_CASE("ConnectionStream") { return Consume(fragment_streams, 4); }); connection_stream.Shutdown(); - AddString(span_buffer, "123"); + AddSpanChunkFramedString(span_buffer, "123"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -217,7 +217,7 @@ TEST_CASE("ConnectionStream") { contents = ToString(fragment_streams); return Consume(fragment_streams, static_cast(contents.size())); }); - AddString(span_buffer, "abc"); + AddSpanChunkFramedString(span_buffer, "abc"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -225,7 +225,7 @@ TEST_CASE("ConnectionStream") { return Consume(fragment_streams, 4); }); connection_stream.Reset(); - AddString(span_buffer, "123"); + AddSpanChunkFramedString(span_buffer, "123"); connection_stream.Flush( [&contents]( std::initializer_list fragment_streams) { @@ -244,15 +244,15 @@ TEST_CASE( "Verify through simulation that ConnectionStream behaves correctly.") { LightStepTracerOptions tracer_options; std::string header_common_fragment = - WriteStreamHeaderCommonFragment(tracer_options, 123); + WriteReportRequestHeader(tracer_options, 123); MetricsObserver metrics_observer; - StreamRecorderMetrics metrics{metrics_observer}; + MetricsTracker metrics{metrics_observer}; auto host_header_fragment = MakeFragment("Host:abc\r\n"); const size_t num_producer_threads = 4; const size_t num_connections = 10; const size_t n = 25000; for (size_t max_size : {1, 2, 10, 100, 1000}) { - CircularBuffer buffer{max_size}; + CircularBuffer buffer{max_size}; SpanStream span_stream{buffer, metrics}; std::vector connection_streams; connection_streams.reserve(num_connections); diff --git a/test/recorder/stream_recorder/span_stream_test.cpp b/test/recorder/stream_recorder/span_stream_test.cpp index 7766f262..1bbc86ae 100644 --- a/test/recorder/stream_recorder/span_stream_test.cpp +++ b/test/recorder/stream_recorder/span_stream_test.cpp @@ -9,9 +9,9 @@ using namespace lightstep; TEST_CASE("SpanStream") { const size_t max_spans = 10; - CircularBuffer buffer{max_spans}; + CircularBuffer buffer{max_spans}; MetricsObserver metrics_observer; - StreamRecorderMetrics metrics{metrics_observer}; + MetricsTracker metrics{metrics_observer}; SpanStream span_stream{buffer, metrics}; SECTION("When the attached buffer is empty, SpanStream has no fragments") { @@ -21,20 +21,20 @@ TEST_CASE("SpanStream") { } SECTION("SpanStream mirrors the contents of its attached buffer") { - REQUIRE(AddString(buffer, "abc123")); + REQUIRE(AddSpanChunkFramedString(buffer, "abc123")); span_stream.Allot(); REQUIRE(ToString(span_stream) == AddSpanChunkFraming("abc123")); } SECTION("SpanStream is empty after it's been cleared") { - REQUIRE(AddString(buffer, "abc123")); + REQUIRE(AddSpanChunkFramedString(buffer, "abc123")); span_stream.Allot(); span_stream.Clear(); REQUIRE(span_stream.empty()); } SECTION("SpanStream leaves a remnant if a span is partially consumed") { - REQUIRE(AddString(buffer, "abc123")); + REQUIRE(AddSpanChunkFramedString(buffer, "abc123")); span_stream.Allot(); auto contents = ToString(span_stream); REQUIRE(!Consume({&span_stream}, 3)); @@ -45,13 +45,13 @@ TEST_CASE("SpanStream") { } SECTION("SpanStream leaves no remnant when spans are completely consumed") { - REQUIRE(AddString(buffer, "abc123")); + REQUIRE(AddSpanChunkFramedString(buffer, "abc123")); span_stream.Allot(); span_stream.Clear(); REQUIRE(span_stream.ConsumeRemnant() == nullptr); - REQUIRE(AddString(buffer, "abc")); - REQUIRE(AddString(buffer, "123")); + REQUIRE(AddSpanChunkFramedString(buffer, "abc")); + REQUIRE(AddSpanChunkFramedString(buffer, "123")); span_stream.Allot(); auto span1 = AddSpanChunkFraming("abc"); REQUIRE(!Consume({&span_stream}, static_cast(span1.size()))); diff --git a/test/recorder/stream_recorder/utility_test.cpp b/test/recorder/stream_recorder/utility_test.cpp index 2c65caa2..abd26ec2 100644 --- a/test/recorder/stream_recorder/utility_test.cpp +++ b/test/recorder/stream_recorder/utility_test.cpp @@ -1,7 +1,6 @@ #include "recorder/stream_recorder/utility.h" #include "3rd_party/catch2/catch.hpp" -#include "lightstep-tracer-common/collector.pb.h" using namespace lightstep; TEST_CASE("SeparateEndpoints") { @@ -27,18 +26,3 @@ TEST_CASE("SeparateEndpoints") { REQUIRE(hosts.size() == 1); } } - -TEST_CASE("WriteStreamHeaderCommonFragment") { - LightStepTracerOptions tracer_options; - tracer_options.access_token = "abc123"; - tracer_options.tags = {{"xyz", 456}}; - auto serialization = WriteStreamHeaderCommonFragment(tracer_options, 789); - collector::ReportRequest report; - REQUIRE(report.ParseFromString(serialization)); - REQUIRE(report.auth().access_token() == "abc123"); - REQUIRE(report.reporter().reporter_id() == 789); - auto& tags = report.reporter().tags(); - REQUIRE(tags.size() == 1); - REQUIRE(tags[0].key() == "xyz"); - REQUIRE(tags[0].int_value() == 456); -} diff --git a/test/utility.cpp b/test/utility.cpp index 4c085f46..3e8ff11e 100644 --- a/test/utility.cpp +++ b/test/utility.cpp @@ -7,8 +7,8 @@ #include #include +#include "common/report_request_framing.h" #include "common/serialization.h" -#include "common/serialization_chain.h" #include "common/utility.h" #include "network/socket.h" @@ -103,16 +103,17 @@ std::string ToString(const FragmentInputStream& fragment_input_stream) { } //-------------------------------------------------------------------------------------------------- -// AddString +// AddSpanChunkFramedString //-------------------------------------------------------------------------------------------------- -bool AddString(CircularBuffer& buffer, - const std::string& s) { - std::unique_ptr chain{new SerializationChain{}}; +bool AddSpanChunkFramedString(CircularBuffer& buffer, + const std::string& s) { + auto framed_s = AddSpanChunkFraming(s); + std::unique_ptr chain{new ChainedStream{}}; { google::protobuf::io::CodedOutputStream stream{chain.get()}; - stream.WriteString(s); + stream.WriteString(framed_s); } - chain->AddFraming(); + chain->CloseOutput(); return buffer.Add(chain); } @@ -125,8 +126,7 @@ std::string AddSpanChunkFraming(opentracing::string_view s) { { google::protobuf::io::OstreamOutputStream zero_copy_stream{&oss}; google::protobuf::io::CodedOutputStream stream{&zero_copy_stream}; - WriteKeyLength(stream, - s.size()); + WriteKeyLength(stream, s.size()); } return oss.str(); }(); diff --git a/test/utility.h b/test/utility.h index 29b5b1fb..3e6f8fa9 100644 --- a/test/utility.h +++ b/test/utility.h @@ -3,9 +3,9 @@ #include #include +#include "common/chained_stream.h" #include "common/circular_buffer.h" #include "common/fragment_input_stream.h" -#include "common/serialization_chain.h" #include #include "lightstep-tracer-common/collector.pb.h" @@ -66,8 +66,8 @@ std::string ToString(const FragmentInputStream& fragment_input_stream); * @param s the string to add. * @return true if the string was succesfully added. */ -bool AddString(CircularBuffer& buffer, - const std::string& s); +bool AddSpanChunkFramedString(CircularBuffer& buffer, + const std::string& s); /** * Adds http/1.1 chunk framing and ReportRequest embedded span framing to a