From 8d739d02f449d29537f76f56e5f6c05a725318b8 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Sun, 24 Nov 2019 20:56:16 -0800 Subject: [PATCH 01/14] Add ShutdownWithTimeout to Recorder interface. --- src/recorder/recorder.h | 10 ++++++++++ src/recorder/stream_recorder/stream_recorder.cpp | 12 ++++++++++++ src/recorder/stream_recorder/stream_recorder.h | 3 +++ 3 files changed, 25 insertions(+) diff --git a/src/recorder/recorder.h b/src/recorder/recorder.h index 0c98dc1d..a702b69a 100644 --- a/src/recorder/recorder.h +++ b/src/recorder/recorder.h @@ -46,6 +46,16 @@ class Recorder { return true; } + /** + * Block until the recorder cleanly shuts down its connections to satellites. + * @param timeout the maximum amount of time to block. + * @return true if the shutdown was completed. + */ + virtual bool ShutdownWithTimeout( + std::chrono::system_clock::duration /*timeout*/) noexcept { + return true; + } + /** * Compute a timestamp delta that con be used to convert between system and * steady timestamps. diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index 9d24ee61..afc21d38 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -82,6 +82,18 @@ bool StreamRecorder::FlushWithTimeout( return false; } +//-------------------------------------------------------------------------------------------------- +// ShutdownWithTimeout +//-------------------------------------------------------------------------------------------------- +bool StreamRecorder::ShutdownWithTimeout( + std::chrono::system_clock::duration timeout) noexcept try { + (void)timeout; + return true; +} catch (const std::exception& e) { + logger_.Error("StreamRecorder::FlushWithTimeout failed: ", e.what()); + return false; +} + //-------------------------------------------------------------------------------------------------- // PrepareForFork //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index fbdcddcb..04a46a13 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -88,6 +88,9 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { bool FlushWithTimeout( std::chrono::system_clock::duration timeout) noexcept override; + bool ShutdownWithTimeout( + std::chrono::system_clock::duration timeout) noexcept override; + int64_t ComputeSystemSteadyTimestampDelta() const noexcept override { return stream_recorder_impl_->timestamp_delta(); } From bea4d5e62ad9820299cd0e508f7573f805abd5b2 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Sun, 24 Nov 2019 23:04:33 -0800 Subject: [PATCH 02/14] Work on clean shutdown functionality. --- src/recorder/stream_recorder/satellite_connection.cpp | 6 ++++++ src/recorder/stream_recorder/satellite_connection.h | 6 ++++++ src/recorder/stream_recorder/satellite_streamer.cpp | 6 ++++++ src/recorder/stream_recorder/satellite_streamer.h | 5 +++++ 4 files changed, 23 insertions(+) diff --git a/src/recorder/stream_recorder/satellite_connection.cpp b/src/recorder/stream_recorder/satellite_connection.cpp index f9bdad74..659a182e 100644 --- a/src/recorder/stream_recorder/satellite_connection.cpp +++ b/src/recorder/stream_recorder/satellite_connection.cpp @@ -75,6 +75,12 @@ bool SatelliteConnection::Flush() noexcept try { return false; } +//-------------------------------------------------------------------------------------------------- +// InitiateShutdown +//-------------------------------------------------------------------------------------------------- +void SatelliteConnection::InitiateShutdown() noexcept { +} + //-------------------------------------------------------------------------------------------------- // ready //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/satellite_connection.h b/src/recorder/stream_recorder/satellite_connection.h index 479b1fef..9368a206 100644 --- a/src/recorder/stream_recorder/satellite_connection.h +++ b/src/recorder/stream_recorder/satellite_connection.h @@ -32,6 +32,11 @@ class SatelliteConnection : private Noncopyable { */ bool Flush() noexcept; + /** + * Initiate a clean shut down of the satellite connection. + */ + void InitiateShutdown() noexcept; + /** * @return true if the satellite connection is available for streaming spans. */ @@ -44,6 +49,7 @@ class SatelliteConnection : private Noncopyable { StatusLineParser status_line_parser_; Socket socket_{InvalidSocket}; bool writable_{false}; + bool shutting_down_{false}; Event read_event_; Event write_event_; Event reconnect_timer_; diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index 91ed2e48..11fb02dc 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -50,6 +50,12 @@ void SatelliteStreamer::Flush() noexcept { }); } +//-------------------------------------------------------------------------------------------------- +// InitiateShutdown +//-------------------------------------------------------------------------------------------------- +void SatelliteStreamer::InitiateShutdown() noexcept { +} + //-------------------------------------------------------------------------------------------------- // OnEndpointManagerReady //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/satellite_streamer.h b/src/recorder/stream_recorder/satellite_streamer.h index 517bb52d..419d2462 100644 --- a/src/recorder/stream_recorder/satellite_streamer.h +++ b/src/recorder/stream_recorder/satellite_streamer.h @@ -73,6 +73,11 @@ class SatelliteStreamer : private Noncopyable { */ void Flush() noexcept; + /** + * Cleanly shut down satellite connections. + */ + void InitiateShutdown() noexcept; + private: Logger& logger_; EventBase& event_base_; From 287d76aacd7f4a89b60d45504f3e455517f8893b Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Mon, 25 Nov 2019 20:26:42 -0800 Subject: [PATCH 03/14] Add interface for shutdown. --- .../stream_recorder/satellite_streamer.cpp | 5 +++++ .../stream_recorder/satellite_streamer.h | 5 +++++ .../stream_recorder/stream_recorder.cpp | 19 +++++++++++++++++-- .../stream_recorder/stream_recorder.h | 5 +++++ .../stream_recorder/stream_recorder_impl.h | 3 +++ 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index 11fb02dc..11a514c0 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -34,6 +34,11 @@ SatelliteStreamer::SatelliteStreamer( endpoint_manager_.Start(); } +//-------------------------------------------------------------------------------------------------- +// is_active +//-------------------------------------------------------------------------------------------------- +bool SatelliteStreamer::is_active() const noexcept { return true; } + //-------------------------------------------------------------------------------------------------- // Flush //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/satellite_streamer.h b/src/recorder/stream_recorder/satellite_streamer.h index 419d2462..3acff999 100644 --- a/src/recorder/stream_recorder/satellite_streamer.h +++ b/src/recorder/stream_recorder/satellite_streamer.h @@ -68,6 +68,11 @@ class SatelliteStreamer : private Noncopyable { return endpoint_manager_; } + /** + * @return true any satellite connections have open sockets. + */ + bool is_active() const noexcept; + /** * Flush data to satellites if connections are available. */ diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index afc21d38..abf3a375 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -87,8 +87,11 @@ bool StreamRecorder::FlushWithTimeout( //-------------------------------------------------------------------------------------------------- bool StreamRecorder::ShutdownWithTimeout( std::chrono::system_clock::duration timeout) noexcept try { - (void)timeout; - return true; + std::unique_lock lock{flush_mutex_}; + ++shutdown_counter_; + shutdown_condition_variable_.wait_for( + lock, timeout, [this] { return exit_ || !last_is_active_; }); + return !last_is_active_; } catch (const std::exception& e) { logger_.Error("StreamRecorder::FlushWithTimeout failed: ", e.what()); return false; @@ -136,6 +139,18 @@ void StreamRecorder::Poll() noexcept { } flush_condition_variable_.notify_all(); } + + if (shutdown_counter_.exchange(0) > 0) { + stream_recorder_impl_->InitiateShutdown(); + } + + if (last_is_active_ && !stream_recorder_impl_->is_active()) { + { + std::lock_guard lock_guard{shutdown_mutex_}; + last_is_active_ = false; + } + shutdown_condition_variable_.notify_all(); + } } //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index 04a46a13..0481d0c0 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -129,6 +129,11 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { std::atomic pending_flush_counter_{0}; int64_t num_spans_consumed_{0}; + std::mutex shutdown_mutex_; + std::condition_variable shutdown_condition_variable_; + std::atomic shutdown_counter_{0}; + bool last_is_active_{true}; + std::unique_ptr stream_recorder_impl_; }; } // namespace lightstep diff --git a/src/recorder/stream_recorder/stream_recorder_impl.h b/src/recorder/stream_recorder/stream_recorder_impl.h index cc130194..cba40f32 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.h +++ b/src/recorder/stream_recorder/stream_recorder_impl.h @@ -27,6 +27,9 @@ class StreamRecorderImpl : private Noncopyable { int64_t timestamp_delta() const noexcept { return timestamp_delta_; } + void InitiateShutdown() noexcept { streamer_.InitiateShutdown(); } + + bool is_active() const noexcept { return streamer_.is_active(); } private: StreamRecorder& stream_recorder_; From d53018dcde857f26df6be1ddda8383f26e52992d Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Mon, 25 Nov 2019 20:55:10 -0800 Subject: [PATCH 04/14] Work on functionality to probe if satellite connections are active. --- src/recorder/stream_recorder/satellite_connection.h | 6 ++++++ src/recorder/stream_recorder/satellite_streamer.cpp | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/recorder/stream_recorder/satellite_connection.h b/src/recorder/stream_recorder/satellite_connection.h index 9368a206..a9df401e 100644 --- a/src/recorder/stream_recorder/satellite_connection.h +++ b/src/recorder/stream_recorder/satellite_connection.h @@ -42,6 +42,11 @@ class SatelliteConnection : private Noncopyable { */ bool ready() const noexcept; + /** + * @return true if a connection to the satellite is active. + */ + bool is_active() const noexcept { return !was_shutdown_; } + private: SatelliteStreamer& streamer_; HostHeader host_header_; @@ -50,6 +55,7 @@ class SatelliteConnection : private Noncopyable { Socket socket_{InvalidSocket}; bool writable_{false}; bool shutting_down_{false}; + bool was_shutdown_{false}; Event read_event_; Event write_event_; Event reconnect_timer_; diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index 11a514c0..eb97974a 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -37,7 +37,14 @@ SatelliteStreamer::SatelliteStreamer( //-------------------------------------------------------------------------------------------------- // is_active //-------------------------------------------------------------------------------------------------- -bool SatelliteStreamer::is_active() const noexcept { return true; } +bool SatelliteStreamer::is_active() const noexcept { + for (auto& connection : connections_) { + if (connection->is_active()) { + return true; + } + } + return false; +} //-------------------------------------------------------------------------------------------------- // Flush From 37cd12cae01d19ae7119acb0aefecf598aa905f3 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 14:57:35 -0800 Subject: [PATCH 05/14] Add remaining parts of shutdown logic. --- .../stream_recorder/satellite_connection.cpp | 16 ++++++++++++++++ .../stream_recorder/satellite_connection.h | 2 +- src/recorder/stream_recorder/stream_recorder.cpp | 4 +++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/recorder/stream_recorder/satellite_connection.cpp b/src/recorder/stream_recorder/satellite_connection.cpp index 659a182e..4e95387e 100644 --- a/src/recorder/stream_recorder/satellite_connection.cpp +++ b/src/recorder/stream_recorder/satellite_connection.cpp @@ -79,6 +79,8 @@ bool SatelliteConnection::Flush() noexcept try { // InitiateShutdown //-------------------------------------------------------------------------------------------------- void SatelliteConnection::InitiateShutdown() noexcept { + InitiateReconnect(); + is_shutting_down_ = true; } //-------------------------------------------------------------------------------------------------- @@ -135,6 +137,11 @@ void SatelliteConnection::FreeSocket() { //-------------------------------------------------------------------------------------------------- void SatelliteConnection::HandleFailure() noexcept try { FreeSocket(); + if (is_shutting_down_) { + // if we failed during shutdown, don't try to reconnect + was_shutdown_ = true; + return; + } streamer_.event_base().OnTimeout( streamer_.recorder_options().satellite_failure_retry_period, MakeTimerCallback(), @@ -157,6 +164,10 @@ void SatelliteConnection::ScheduleReconnect() { // InitiateReconnect //-------------------------------------------------------------------------------------------------- void SatelliteConnection::InitiateReconnect() noexcept try { + if (is_shutting_down_) { + // If we're shutting down, then we've already scheduled a reconnect so do nothing. + return; + } connection_stream_.Shutdown(); if (writable_) { Flush(); @@ -173,6 +184,11 @@ void SatelliteConnection::InitiateReconnect() noexcept try { //-------------------------------------------------------------------------------------------------- void SatelliteConnection::Reconnect() noexcept try { FreeSocket(); + if (is_shutting_down_) { + // break the reconnection cycle + was_shutdown_ = true; + return; + } Connect(); } catch (const std::exception& e) { streamer_.logger().Error("Reconnect failed: ", e.what()); diff --git a/src/recorder/stream_recorder/satellite_connection.h b/src/recorder/stream_recorder/satellite_connection.h index a9df401e..abbf717e 100644 --- a/src/recorder/stream_recorder/satellite_connection.h +++ b/src/recorder/stream_recorder/satellite_connection.h @@ -54,7 +54,7 @@ class SatelliteConnection : private Noncopyable { StatusLineParser status_line_parser_; Socket socket_{InvalidSocket}; bool writable_{false}; - bool shutting_down_{false}; + bool is_shutting_down_{false}; bool was_shutdown_{false}; Event read_event_; Event write_event_; diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index abf3a375..17e1769c 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -35,10 +35,12 @@ StreamRecorder::StreamRecorder(Logger& logger, //-------------------------------------------------------------------------------------------------- StreamRecorder::~StreamRecorder() noexcept { { - std::lock_guard lock_guard{flush_mutex_}; + std::lock_guard flush_lock_guard{flush_mutex_}; + std::lock_guard shutdown_lock_guard{shutdown_mutex_}; exit_ = true; } flush_condition_variable_.notify_all(); + shutdown_condition_variable_.notify_all(); } //-------------------------------------------------------------------------------------------------- From 84891482dc01dfbeccb881c5585ed6005a56cef1 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 21:36:47 -0800 Subject: [PATCH 06/14] Add test for shutdown. --- benchmark/tracer_upload_bench/utility.cpp | 2 +- .../stream_recorder/satellite_connection.cpp | 3 ++- .../stream_recorder/satellite_streamer.cpp | 3 +++ .../stream_recorder/stream_recorder.cpp | 2 +- .../stream_recorder/stream_recorder_impl.h | 1 + .../stream_recorder/stream_recorder_test.cpp | 23 +++++++++++++++++++ 6 files changed, 31 insertions(+), 3 deletions(-) diff --git a/benchmark/tracer_upload_bench/utility.cpp b/benchmark/tracer_upload_bench/utility.cpp index ab2053ce..c1da90ca 100644 --- a/benchmark/tracer_upload_bench/utility.cpp +++ b/benchmark/tracer_upload_bench/utility.cpp @@ -7,8 +7,8 @@ #include #include -#include "tracer/lightstep_tracer_factory.h" #include "tracer/json_options.h" +#include "tracer/lightstep_tracer_factory.h" #include "google/protobuf/util/json_util.h" diff --git a/src/recorder/stream_recorder/satellite_connection.cpp b/src/recorder/stream_recorder/satellite_connection.cpp index 4e95387e..3ec8316f 100644 --- a/src/recorder/stream_recorder/satellite_connection.cpp +++ b/src/recorder/stream_recorder/satellite_connection.cpp @@ -165,7 +165,8 @@ void SatelliteConnection::ScheduleReconnect() { //-------------------------------------------------------------------------------------------------- void SatelliteConnection::InitiateReconnect() noexcept try { if (is_shutting_down_) { - // If we're shutting down, then we've already scheduled a reconnect so do nothing. + // If we're shutting down, then we've already scheduled a reconnect so do + // nothing. return; } connection_stream_.Shutdown(); diff --git a/src/recorder/stream_recorder/satellite_streamer.cpp b/src/recorder/stream_recorder/satellite_streamer.cpp index eb97974a..b7cc7b57 100644 --- a/src/recorder/stream_recorder/satellite_streamer.cpp +++ b/src/recorder/stream_recorder/satellite_streamer.cpp @@ -66,6 +66,9 @@ void SatelliteStreamer::Flush() noexcept { // InitiateShutdown //-------------------------------------------------------------------------------------------------- void SatelliteStreamer::InitiateShutdown() noexcept { + for (auto& connection : connections_) { + connection->InitiateShutdown(); + } } //-------------------------------------------------------------------------------------------------- diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index 17e1769c..c044e061 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -145,7 +145,7 @@ void StreamRecorder::Poll() noexcept { if (shutdown_counter_.exchange(0) > 0) { stream_recorder_impl_->InitiateShutdown(); } - + if (last_is_active_ && !stream_recorder_impl_->is_active()) { { std::lock_guard lock_guard{shutdown_mutex_}; diff --git a/src/recorder/stream_recorder/stream_recorder_impl.h b/src/recorder/stream_recorder/stream_recorder_impl.h index cba40f32..bb3cd13c 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.h +++ b/src/recorder/stream_recorder/stream_recorder_impl.h @@ -30,6 +30,7 @@ class StreamRecorderImpl : private Noncopyable { void InitiateShutdown() noexcept { streamer_.InitiateShutdown(); } bool is_active() const noexcept { return streamer_.is_active(); } + private: StreamRecorder& stream_recorder_; diff --git a/test/recorder/stream_recorder/stream_recorder_test.cpp b/test/recorder/stream_recorder/stream_recorder_test.cpp index afe272d3..2458a5f5 100644 --- a/test/recorder/stream_recorder/stream_recorder_test.cpp +++ b/test/recorder/stream_recorder/stream_recorder_test.cpp @@ -176,4 +176,27 @@ TEST_CASE("StreamRecorder") { stop = true; generator.join(); } + + SECTION( + "If a tracer is destroyed without shutting down first, it will close " + "satellite sockets before the http sessions are finished") { + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + tracer = nullptr; + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + } + + SECTION("If a tracer is shut down, it waits for a response from the server") { + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } + + SECTION("We can shut down a recorder twice") { + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } } From 470b4d716d7e3aa7addbcd57cb5276f071b96cec Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 21:47:06 -0800 Subject: [PATCH 07/14] Add test coverage for shutdown. --- src/tracer/tracer_impl.cpp | 15 ++++++++++++++- .../stream_recorder/stream_recorder_test.cpp | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/tracer/tracer_impl.cpp b/src/tracer/tracer_impl.cpp index 0ea327d1..c066f92f 100644 --- a/src/tracer/tracer_impl.cpp +++ b/src/tracer/tracer_impl.cpp @@ -134,5 +134,18 @@ bool TracerImpl::FlushWithTimeout( //------------------------------------------------------------------------------ // Close //------------------------------------------------------------------------------ -void TracerImpl::Close() noexcept { Flush(); } +void TracerImpl::Close() noexcept { + auto t1 = std::chrono::steady_clock::now(); + if (!recorder_->FlushWithTimeout(DefaultFlushTimeout)) { + return; + } + auto delta = std::chrono::steady_clock::now() - t1; + if (delta >= DefaultFlushTimeout) { + return; + } + recorder_->ShutdownWithTimeout( + std::chrono::duration_cast( + DefaultFlushTimeout) - + delta); +} } // namespace lightstep diff --git a/test/recorder/stream_recorder/stream_recorder_test.cpp b/test/recorder/stream_recorder/stream_recorder_test.cpp index 2458a5f5..7eb2003a 100644 --- a/test/recorder/stream_recorder/stream_recorder_test.cpp +++ b/test/recorder/stream_recorder/stream_recorder_test.cpp @@ -199,4 +199,18 @@ TEST_CASE("StreamRecorder") { stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); } + + SECTION("Close will also shut down the recorder") { + tracer->StartSpan("abc"); + tracer->Close(); + REQUIRE(logger_sink->contents().find("is readable") != std::string::npos); + } + + SECTION("Shutdown will return early if it times out") { + mock_satellite->SetRequestTimeout(); + tracer->StartSpan("abc"); + stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50}); + stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50}); + REQUIRE(logger_sink->contents().find("is readable") == std::string::npos); + } } From c807662b6a6b5efcc319c3ec2cdccd69f2419933 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 22:00:18 -0800 Subject: [PATCH 08/14] Add missing documentation. --- src/recorder/stream_recorder/stream_recorder_impl.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/recorder/stream_recorder/stream_recorder_impl.h b/src/recorder/stream_recorder/stream_recorder_impl.h index bb3cd13c..235c321a 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.h +++ b/src/recorder/stream_recorder/stream_recorder_impl.h @@ -25,10 +25,20 @@ class StreamRecorderImpl : private Noncopyable { ~StreamRecorderImpl() noexcept; + /** + * A cached delta of difference between std::chrono::system_clock and + * std::chrono::steady_clock. + */ int64_t timestamp_delta() const noexcept { return timestamp_delta_; } + /** + * Schedule the recorder to cleanly close all satellite connections. + */ void InitiateShutdown() noexcept { streamer_.InitiateShutdown(); } + /** + * @return true if any satellite connections are active. + */ bool is_active() const noexcept { return streamer_.is_active(); } private: From 410c3e08cfdcbffc08d4ca08ebf8b0923dac6bbe Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 22:14:21 -0800 Subject: [PATCH 09/14] Fix locking. --- src/recorder/stream_recorder/stream_recorder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index c044e061..5bb9cc31 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -89,7 +89,7 @@ bool StreamRecorder::FlushWithTimeout( //-------------------------------------------------------------------------------------------------- bool StreamRecorder::ShutdownWithTimeout( std::chrono::system_clock::duration timeout) noexcept try { - std::unique_lock lock{flush_mutex_}; + std::unique_lock lock{shutdown_mutex_}; ++shutdown_counter_; shutdown_condition_variable_.wait_for( lock, timeout, [this] { return exit_ || !last_is_active_; }); From c0fc789be3c5cdc366e3ccdc06847c0f0b676c53 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 22:27:55 -0800 Subject: [PATCH 10/14] Fix error on windows. --- src/tracer/tracer_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracer/tracer_impl.cpp b/src/tracer/tracer_impl.cpp index c066f92f..f373f6ac 100644 --- a/src/tracer/tracer_impl.cpp +++ b/src/tracer/tracer_impl.cpp @@ -144,7 +144,7 @@ void TracerImpl::Close() noexcept { return; } recorder_->ShutdownWithTimeout( - std::chrono::duration_cast( + std::chrono::duration_cast( DefaultFlushTimeout) - delta); } From feae0c5f3fe5bfb0b181aa25535a4e313eb45834 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 22:42:13 -0800 Subject: [PATCH 11/14] Silence tsan warning. --- src/recorder/stream_recorder/stream_recorder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index 0481d0c0..473bf543 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -132,7 +132,7 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { std::mutex shutdown_mutex_; std::condition_variable shutdown_condition_variable_; std::atomic shutdown_counter_{0}; - bool last_is_active_{true}; + std::atomic last_is_active_{true}; std::unique_ptr stream_recorder_impl_; }; From 04584770ebc3bc8178ba7280da3e1fd77bbc4a7f Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 22:48:27 -0800 Subject: [PATCH 12/14] Add more commenting. --- src/recorder/stream_recorder/stream_recorder.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index 473bf543..f9150ffb 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -132,6 +132,9 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { std::mutex shutdown_mutex_; std::condition_variable shutdown_condition_variable_; std::atomic shutdown_counter_{0}; + + // Used by polling to track when the satellite connections become inactive so + // that any threads waiting on shutdown can be notified. std::atomic last_is_active_{true}; std::unique_ptr stream_recorder_impl_; From 14d7fc399ca50ec7748308fad22231ea3cd98c8f Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 23:02:22 -0800 Subject: [PATCH 13/14] Rework to avoid tsan false positive. --- src/recorder/stream_recorder/stream_recorder.cpp | 6 +++--- src/recorder/stream_recorder/stream_recorder.h | 10 ++++++++-- src/recorder/stream_recorder/stream_recorder_impl.cpp | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/recorder/stream_recorder/stream_recorder.cpp b/src/recorder/stream_recorder/stream_recorder.cpp index 5bb9cc31..c4e39c88 100644 --- a/src/recorder/stream_recorder/stream_recorder.cpp +++ b/src/recorder/stream_recorder/stream_recorder.cpp @@ -132,7 +132,7 @@ void StreamRecorder::OnForkedChild() noexcept { //-------------------------------------------------------------------------------------------------- // Poll //-------------------------------------------------------------------------------------------------- -void StreamRecorder::Poll() noexcept { +void StreamRecorder::Poll(StreamRecorderImpl& stream_recorder_impl) noexcept { auto num_spans_consumed = span_buffer_.consumption_count(); if (num_spans_consumed > num_spans_consumed_) { { @@ -143,10 +143,10 @@ void StreamRecorder::Poll() noexcept { } if (shutdown_counter_.exchange(0) > 0) { - stream_recorder_impl_->InitiateShutdown(); + stream_recorder_impl.InitiateShutdown(); } - if (last_is_active_ && !stream_recorder_impl_->is_active()) { + if (last_is_active_ && !stream_recorder_impl.is_active()) { { std::lock_guard lock_guard{shutdown_mutex_}; last_is_active_ = false; diff --git a/src/recorder/stream_recorder/stream_recorder.h b/src/recorder/stream_recorder/stream_recorder.h index f9150ffb..19cfd6ea 100644 --- a/src/recorder/stream_recorder/stream_recorder.h +++ b/src/recorder/stream_recorder/stream_recorder.h @@ -35,8 +35,14 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { /** * Checks whether any threads blocked on flush calls can be resumed. + * @param stream_recorder_impl a reference to the stream recorder + * implementation that invoked poll. + * + * Note: stream_recorder_impl is redundant since it can also be accessed + * through the member variable stream_recorder_impl_, but it's structured this + * way to avoid false positives from TSAN. */ - void Poll() noexcept; + void Poll(StreamRecorderImpl& stream_recorder_impl) noexcept; /** * Gets the pending flush count and resets the counter. @@ -135,7 +141,7 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable { // Used by polling to track when the satellite connections become inactive so // that any threads waiting on shutdown can be notified. - std::atomic last_is_active_{true}; + bool last_is_active_{true}; std::unique_ptr stream_recorder_impl_; }; diff --git a/src/recorder/stream_recorder/stream_recorder_impl.cpp b/src/recorder/stream_recorder/stream_recorder_impl.cpp index 6ec2bc65..34ebdb4b 100644 --- a/src/recorder/stream_recorder/stream_recorder_impl.cpp +++ b/src/recorder/stream_recorder/stream_recorder_impl.cpp @@ -78,7 +78,7 @@ void StreamRecorderImpl::Poll() noexcept { Flush(); } - stream_recorder_.Poll(); + stream_recorder_.Poll(*this); } //-------------------------------------------------------------------------------------------------- From a69a0ff5480d3d00f51941ea6af4ded2d6fd8a1a Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 27 Nov 2019 23:39:38 -0800 Subject: [PATCH 14/14] Fix windows build. --- src/tracer/tracer_impl.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tracer/tracer_impl.cpp b/src/tracer/tracer_impl.cpp index f373f6ac..e977b680 100644 --- a/src/tracer/tracer_impl.cpp +++ b/src/tracer/tracer_impl.cpp @@ -143,9 +143,11 @@ void TracerImpl::Close() noexcept { if (delta >= DefaultFlushTimeout) { return; } - recorder_->ShutdownWithTimeout( + auto timeout = std::chrono::duration_cast( DefaultFlushTimeout) - - delta); + delta; + recorder_->ShutdownWithTimeout( + std::chrono::duration_cast(timeout)); } } // namespace lightstep