diff --git a/benchmark/tracer_upload_bench/utility.cpp b/benchmark/tracer_upload_bench/utility.cpp index ab2053ce..c1da90ca 100644 --- a/benchmark/tracer_upload_bench/utility.cpp +++ b/benchmark/tracer_upload_bench/utility.cpp @@ -7,8 +7,8 @@ #include #include -#include "tracer/lightstep_tracer_factory.h" #include "tracer/json_options.h" +#include "tracer/lightstep_tracer_factory.h" #include "google/protobuf/util/json_util.h" diff --git a/src/recorder/recorder.h b/src/recorder/recorder.h index 0c98dc1d..a702b69a 100644 --- a/src/recorder/recorder.h +++ b/src/recorder/recorder.h @@ -46,6 +46,16 @@ class Recorder { return true; } + /** + * Block until the recorder cleanly shuts down its connections to satellites. + * @param timeout the maximum amount of time to block. + * @return true if the shutdown was completed. + */ + virtual bool ShutdownWithTimeout( + std::chrono::system_clock::duration /*timeout*/) noexcept { + return true; + } + /** * Compute a timestamp delta that con be used to convert between system and * steady timestamps. diff --git a/src/recorder/stream_recorder/satellite_connection.cpp b/src/recorder/stream_recorder/satellite_connection.cpp index f9bdad74..3ec8316f 100644 --- a/src/recorder/stream_recorder/satellite_connection.cpp +++ b/src/recorder/stream_recorder/satellite_connection.cpp @@ -75,6 +75,14 @@ bool SatelliteConnection::Flush() noexcept try { return false; } +//-------------------------------------------------------------------------------------------------- +// InitiateShutdown +//-------------------------------------------------------------------------------------------------- +void SatelliteConnection::InitiateShutdown() noexcept { + InitiateReconnect(); + is_shutting_down_ = true; +} + //-------------------------------------------------------------------------------------------------- // ready //-------------------------------------------------------------------------------------------------- @@ -129,6 +137,11 @@ void SatelliteConnection::FreeSocket() { //-------------------------------------------------------------------------------------------------- void SatelliteConnection::HandleFailure() noexcept try { FreeSocket(); + if (is_shutting_down_) { + // if we failed during shutdown, don't try to reconnect + was_shutdown_ = true; + return; + } streamer_.event_base().OnTimeout( streamer_.recorder_options().satellite_failure_retry_period, MakeTimerCallback(), @@ -151,6 +164,11 @@ void SatelliteConnection::ScheduleReconnect() { // InitiateReconnect //-------------------------------------------------------------------------------------------------- void SatelliteConnection::InitiateReconnect() noexcept try { + if (is_shutting_down_) { + // If we're shutting down, then we've already scheduled a reconnect so do + // nothing. + return; + } connection_stream_.Shutdown(); if (writable_) { Flush(); @@ -167,6 +185,11 @@ void SatelliteConnection::InitiateReconnect() noexcept try { //-------------------------------------------------------------------------------------------------- void SatelliteConnection::Reconnect() noexcept try { FreeSocket(); + if (is_shutting_down_) { + // break the reconnection cycle + was_shutdown_ = true; + return; + } Connect(); } catch (const std::exception& e) { streamer_.logger().Error("Reconnect failed: ", e.what()); diff --git a/src/recorder/stream_recorder/satellite_connection.h b/src/recorder/stream_recorder/satellite_connection.h index 479b1fef..abbf717e 100644 --- a/src/recorder/stream_recorder/satellite_connection.h +++ b/src/recorder/stream_recorder/satellite_connection.h @@ -32,11 +32,21 @@ class SatelliteConnection : private Noncopyable { */ bool Flush() noexcept; + /** + * Initiate a clean shut down of the satellite connection. + */ + void InitiateShutdown() noexcept; + /** * @return true if the satellite connection is available for streaming spans. */ bool ready() const noexcept; + /** + * @return true if a connection to the satellite is active. + */ + bool is_active() const noexcept { return !was_shutdown_; } + private: SatelliteStreamer& streamer_; HostHeader host_header_; @@ -44,6 +54,8 @@ class SatelliteConnection : private Noncopyable { StatusLineParser status_line_parser_; Socket socket_{InvalidSocket}; bool writable_{false}; + bool is_shutting_down_{false}; + bool was_shutdown_{false}; Event read_event_; Event write_event_; Event reconnect_timer_; diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index 91ed2e48..b7cc7b57 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -34,6 +34,18 @@ SatelliteStreamer::SatelliteStreamer( endpoint_manager_.Start(); } +//-------------------------------------------------------------------------------------------------- +// is_active +//-------------------------------------------------------------------------------------------------- +bool SatelliteStreamer::is_active() const noexcept { + for (auto& connection : connections_) { + if (connection->is_active()) { + return true; + } + } + return false; +} + //-------------------------------------------------------------------------------------------------- // Flush //-------------------------------------------------------------------------------------------------- @@ -50,6 +62,15 @@ void SatelliteStreamer::Flush() noexcept { }); } +//-------------------------------------------------------------------------------------------------- +// InitiateShutdown +//-------------------------------------------------------------------------------------------------- +void SatelliteStreamer::InitiateShutdown() noexcept { + for (auto& connection : connections_) { + connection->InitiateShutdown(); + } +} + //-------------------------------------------------------------------------------------------------- // OnEndpointManagerReady //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/satellite_streamer.h b/src/recorder/stream_recorder/satellite_streamer.h index 517bb52d..3acff999 100644 --- a/src/recorder/stream_recorder/satellite_streamer.h +++ b/src/recorder/stream_recorder/satellite_streamer.h @@ -68,11 +68,21 @@ class SatelliteStreamer : private Noncopyable { return endpoint_manager_; } + /** + * @return true any satellite connections have open sockets. + */ + bool is_active() const noexcept; + /** * Flush data to satellites if connections are available. */ void Flush() noexcept; + /** + * Cleanly shut down satellite connections. + */ + void InitiateShutdown() noexcept; + private: Logger& logger_; EventBase& event_base_; diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index 9d24ee61..c4e39c88 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -35,10 +35,12 @@ StreamRecorder::StreamRecorder(Logger& logger, //-------------------------------------------------------------------------------------------------- StreamRecorder::~StreamRecorder() noexcept { { - std::lock_guard lock_guard{flush_mutex_}; + std::lock_guard flush_lock_guard{flush_mutex_}; + std::lock_guard shutdown_lock_guard{shutdown_mutex_}; exit_ = true; } flush_condition_variable_.notify_all(); + shutdown_condition_variable_.notify_all(); } //-------------------------------------------------------------------------------------------------- @@ -82,6 +84,21 @@ bool StreamRecorder::FlushWithTimeout( return false; } +//-------------------------------------------------------------------------------------------------- +// ShutdownWithTimeout +//-------------------------------------------------------------------------------------------------- +bool StreamRecorder::ShutdownWithTimeout( + std::chrono::system_clock::duration timeout) noexcept try { + std::unique_lock lock{shutdown_mutex_}; + ++shutdown_counter_; + shutdown_condition_variable_.wait_for( + lock, timeout, [this] { return exit_ || !last_is_active_; }); + return !last_is_active_; +} catch (const std::exception& e) { + logger_.Error("StreamRecorder::FlushWithTimeout failed: ", e.what()); + return false; +} + //-------------------------------------------------------------------------------------------------- // PrepareForFork //-------------------------------------------------------------------------------------------------- @@ -115,7 +132,7 @@ void StreamRecorder::OnForkedChild() noexcept { //-------------------------------------------------------------------------------------------------- // Poll //-------------------------------------------------------------------------------------------------- -void StreamRecorder::Poll() noexcept { +void StreamRecorder::Poll(StreamRecorderImpl& stream_recorder_impl) noexcept { auto num_spans_consumed = span_buffer_.consumption_count(); if (num_spans_consumed > num_spans_consumed_) { { @@ -124,6 +141,18 @@ void StreamRecorder::Poll() noexcept { } flush_condition_variable_.notify_all(); } + + if (shutdown_counter_.exchange(0) > 0) { + stream_recorder_impl.InitiateShutdown(); + } + + if (last_is_active_ && !stream_recorder_impl.is_active()) { + { + std::lock_guard lock_guard{shutdown_mutex_}; + last_is_active_ = false; + } + shutdown_condition_variable_.notify_all(); + } } //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index fbdcddcb..19cfd6ea 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -35,8 +35,14 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { /** * Checks whether any threads blocked on flush calls can be resumed. + * @param stream_recorder_impl a reference to the stream recorder + * implementation that invoked poll. + * + * Note: stream_recorder_impl is redundant since it can also be accessed + * through the member variable stream_recorder_impl_, but it's structured this + * way to avoid false positives from TSAN. */ - void Poll() noexcept; + void Poll(StreamRecorderImpl& stream_recorder_impl) noexcept; /** * Gets the pending flush count and resets the counter. @@ -88,6 +94,9 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { bool FlushWithTimeout( std::chrono::system_clock::duration timeout) noexcept override; + bool ShutdownWithTimeout( + std::chrono::system_clock::duration timeout) noexcept override; + int64_t ComputeSystemSteadyTimestampDelta() const noexcept override { return stream_recorder_impl_->timestamp_delta(); } @@ -126,6 +135,14 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { std::atomic pending_flush_counter_{0}; int64_t num_spans_consumed_{0}; + std::mutex shutdown_mutex_; + std::condition_variable shutdown_condition_variable_; + std::atomic shutdown_counter_{0}; + + // Used by polling to track when the satellite connections become inactive so + // that any threads waiting on shutdown can be notified. + bool last_is_active_{true}; + std::unique_ptr stream_recorder_impl_; }; } // namespace lightstep diff --git a/src/recorder/stream_recorder/stream_recorder_impl.cpp b/src/recorder/stream_recorder/stream_recorder_impl.cpp index 6ec2bc65..34ebdb4b 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.cpp +++ b/src/recorder/stream_recorder/stream_recorder_impl.cpp @@ -78,7 +78,7 @@ void StreamRecorderImpl::Poll() noexcept { Flush(); } - stream_recorder_.Poll(); + stream_recorder_.Poll(*this); } //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/stream_recorder_impl.h b/src/recorder/stream_recorder/stream_recorder_impl.h index cc130194..235c321a 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.h +++ b/src/recorder/stream_recorder/stream_recorder_impl.h @@ -25,8 +25,22 @@ class StreamRecorderImpl : private Noncopyable { ~StreamRecorderImpl() noexcept; + /** + * A cached delta of difference between std::chrono::system_clock and + * std::chrono::steady_clock. + */ int64_t timestamp_delta() const noexcept { return timestamp_delta_; } + /** + * Schedule the recorder to cleanly close all satellite connections. + */ + void InitiateShutdown() noexcept { streamer_.InitiateShutdown(); } + + /** + * @return true if any satellite connections are active. + */ + bool is_active() const noexcept { return streamer_.is_active(); } + private: StreamRecorder& stream_recorder_; diff --git a/src/tracer/tracer_impl.cpp b/src/tracer/tracer_impl.cpp index 0ea327d1..e977b680 100644 --- a/src/tracer/tracer_impl.cpp +++ b/src/tracer/tracer_impl.cpp @@ -134,5 +134,20 @@ bool TracerImpl::FlushWithTimeout( //------------------------------------------------------------------------------ // Close //------------------------------------------------------------------------------ -void TracerImpl::Close() noexcept { Flush(); } +void TracerImpl::Close() noexcept { + auto t1 = std::chrono::steady_clock::now(); + if (!recorder_->FlushWithTimeout(DefaultFlushTimeout)) { + return; + } + auto delta = std::chrono::steady_clock::now() - t1; + if (delta >= DefaultFlushTimeout) { + return; + } + auto timeout = + std::chrono::duration_cast( + DefaultFlushTimeout) - + delta; + recorder_->ShutdownWithTimeout( + std::chrono::duration_cast(timeout)); +} } // namespace lightstep diff --git a/test/recorder/stream_recorder/stream_recorder_test.cpp b/test/recorder/stream_recorder/stream_recorder_test.cpp index afe272d3..7eb2003a 100644 --- a/test/recorder/stream_recorder/stream_recorder_test.cpp +++ b/test/recorder/stream_recorder/stream_recorder_test.cpp @@ -176,4 +176,41 @@ TEST_CASE("StreamRecorder") { stop = true; generator.join(); } + + SECTION( + "If a tracer is destroyed without shutting down first, it will close " + "satellite sockets before the http sessions are finished") { + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + tracer = nullptr; + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + } + + SECTION("If a tracer is shut down, it waits for a response from the server") { + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } + + SECTION("We can shut down a recorder twice") { + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } + + SECTION("Close will also shut down the recorder") { + tracer->StartSpan("abc"); + tracer->Close(); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } + + SECTION("Shutdown will return early if it times out") { + mock_satellite->SetRequestTimeout(); + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") == std::string::npos); + } }