Skip to content
Merged
2 changes: 1 addition & 1 deletion benchmark/tracer_upload_bench/utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include <sstream>
#include <stdexcept>

#include "tracer/lightstep_tracer_factory.h"
#include "tracer/json_options.h"
#include "tracer/lightstep_tracer_factory.h"

#include "google/protobuf/util/json_util.h"

Expand Down
10 changes: 10 additions & 0 deletions src/recorder/recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ class Recorder {
return true;
}

/**
* Block until the recorder cleanly shuts down its connections to satellites.
* @param timeout the maximum amount of time to block.
* @return true if the shutdown was completed.
*/
virtual bool ShutdownWithTimeout(
std::chrono::system_clock::duration /*timeout*/) noexcept {
return true;
}

/**
* Compute a timestamp delta that con be used to convert between system and
* steady timestamps.
Expand Down
23 changes: 23 additions & 0 deletions src/recorder/stream_recorder/satellite_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ bool SatelliteConnection::Flush() noexcept try {
return false;
}

//--------------------------------------------------------------------------------------------------
// InitiateShutdown
//--------------------------------------------------------------------------------------------------
void SatelliteConnection::InitiateShutdown() noexcept {
InitiateReconnect();
is_shutting_down_ = true;
}

//--------------------------------------------------------------------------------------------------
// ready
//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -129,6 +137,11 @@ void SatelliteConnection::FreeSocket() {
//--------------------------------------------------------------------------------------------------
void SatelliteConnection::HandleFailure() noexcept try {
FreeSocket();
if (is_shutting_down_) {
// if we failed during shutdown, don't try to reconnect
was_shutdown_ = true;
return;
}
streamer_.event_base().OnTimeout(
streamer_.recorder_options().satellite_failure_retry_period,
MakeTimerCallback<SatelliteConnection, &SatelliteConnection::Connect>(),
Expand All @@ -151,6 +164,11 @@ void SatelliteConnection::ScheduleReconnect() {
// InitiateReconnect
//--------------------------------------------------------------------------------------------------
void SatelliteConnection::InitiateReconnect() noexcept try {
if (is_shutting_down_) {
// If we're shutting down, then we've already scheduled a reconnect so do
// nothing.
return;
}
connection_stream_.Shutdown();
if (writable_) {
Flush();
Expand All @@ -167,6 +185,11 @@ void SatelliteConnection::InitiateReconnect() noexcept try {
//--------------------------------------------------------------------------------------------------
void SatelliteConnection::Reconnect() noexcept try {
FreeSocket();
if (is_shutting_down_) {
// break the reconnection cycle
was_shutdown_ = true;
return;
}
Connect();
} catch (const std::exception& e) {
streamer_.logger().Error("Reconnect failed: ", e.what());
Expand Down
12 changes: 12 additions & 0 deletions src/recorder/stream_recorder/satellite_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,30 @@ class SatelliteConnection : private Noncopyable {
*/
bool Flush() noexcept;

/**
* Initiate a clean shut down of the satellite connection.
*/
void InitiateShutdown() noexcept;

/**
* @return true if the satellite connection is available for streaming spans.
*/
bool ready() const noexcept;

/**
* @return true if a connection to the satellite is active.
*/
bool is_active() const noexcept { return !was_shutdown_; }

private:
SatelliteStreamer& streamer_;
HostHeader host_header_;
ConnectionStream connection_stream_;
StatusLineParser status_line_parser_;
Socket socket_{InvalidSocket};
bool writable_{false};
bool is_shutting_down_{false};
bool was_shutdown_{false};
Event read_event_;
Event write_event_;
Event reconnect_timer_;
Expand Down
21 changes: 21 additions & 0 deletions src/recorder/stream_recorder/satellite_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ SatelliteStreamer::SatelliteStreamer(
endpoint_manager_.Start();
}

//--------------------------------------------------------------------------------------------------
// is_active
//--------------------------------------------------------------------------------------------------
bool SatelliteStreamer::is_active() const noexcept {
for (auto& connection : connections_) {
if (connection->is_active()) {
return true;
}
}
return false;
}

//--------------------------------------------------------------------------------------------------
// Flush
//--------------------------------------------------------------------------------------------------
Expand All @@ -50,6 +62,15 @@ void SatelliteStreamer::Flush() noexcept {
});
}

//--------------------------------------------------------------------------------------------------
// InitiateShutdown
//--------------------------------------------------------------------------------------------------
void SatelliteStreamer::InitiateShutdown() noexcept {
for (auto& connection : connections_) {
connection->InitiateShutdown();
}
}

//--------------------------------------------------------------------------------------------------
// OnEndpointManagerReady
//--------------------------------------------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions src/recorder/stream_recorder/satellite_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,21 @@ class SatelliteStreamer : private Noncopyable {
return endpoint_manager_;
}

/**
* @return true any satellite connections have open sockets.
*/
bool is_active() const noexcept;

/**
* Flush data to satellites if connections are available.
*/
void Flush() noexcept;

/**
* Cleanly shut down satellite connections.
*/
void InitiateShutdown() noexcept;

private:
Logger& logger_;
EventBase& event_base_;
Expand Down
33 changes: 31 additions & 2 deletions src/recorder/stream_recorder/stream_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ StreamRecorder::StreamRecorder(Logger& logger,
//--------------------------------------------------------------------------------------------------
StreamRecorder::~StreamRecorder() noexcept {
{
std::lock_guard<std::mutex> lock_guard{flush_mutex_};
std::lock_guard<std::mutex> flush_lock_guard{flush_mutex_};
std::lock_guard<std::mutex> shutdown_lock_guard{shutdown_mutex_};
exit_ = true;
}
flush_condition_variable_.notify_all();
shutdown_condition_variable_.notify_all();
}

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -82,6 +84,21 @@ bool StreamRecorder::FlushWithTimeout(
return false;
}

//--------------------------------------------------------------------------------------------------
// ShutdownWithTimeout
//--------------------------------------------------------------------------------------------------
bool StreamRecorder::ShutdownWithTimeout(
std::chrono::system_clock::duration timeout) noexcept try {
std::unique_lock<std::mutex> lock{shutdown_mutex_};
++shutdown_counter_;
shutdown_condition_variable_.wait_for(
lock, timeout, [this] { return exit_ || !last_is_active_; });
return !last_is_active_;
} catch (const std::exception& e) {
logger_.Error("StreamRecorder::FlushWithTimeout failed: ", e.what());
return false;
}

//--------------------------------------------------------------------------------------------------
// PrepareForFork
//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -115,7 +132,7 @@ void StreamRecorder::OnForkedChild() noexcept {
//--------------------------------------------------------------------------------------------------
// Poll
//--------------------------------------------------------------------------------------------------
void StreamRecorder::Poll() noexcept {
void StreamRecorder::Poll(StreamRecorderImpl& stream_recorder_impl) noexcept {
auto num_spans_consumed = span_buffer_.consumption_count();
if (num_spans_consumed > num_spans_consumed_) {
{
Expand All @@ -124,6 +141,18 @@ void StreamRecorder::Poll() noexcept {
}
flush_condition_variable_.notify_all();
}

if (shutdown_counter_.exchange(0) > 0) {
stream_recorder_impl.InitiateShutdown();
}

if (last_is_active_ && !stream_recorder_impl.is_active()) {
{
std::lock_guard<std::mutex> lock_guard{shutdown_mutex_};
last_is_active_ = false;
}
shutdown_condition_variable_.notify_all();
}
}

//--------------------------------------------------------------------------------------------------
Expand Down
19 changes: 18 additions & 1 deletion src/recorder/stream_recorder/stream_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable {

/**
* Checks whether any threads blocked on flush calls can be resumed.
* @param stream_recorder_impl a reference to the stream recorder
* implementation that invoked poll.
*
* Note: stream_recorder_impl is redundant since it can also be accessed
* through the member variable stream_recorder_impl_, but it's structured this
* way to avoid false positives from TSAN.
*/
void Poll() noexcept;
void Poll(StreamRecorderImpl& stream_recorder_impl) noexcept;

/**
* Gets the pending flush count and resets the counter.
Expand Down Expand Up @@ -88,6 +94,9 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable {
bool FlushWithTimeout(
std::chrono::system_clock::duration timeout) noexcept override;

bool ShutdownWithTimeout(
std::chrono::system_clock::duration timeout) noexcept override;

int64_t ComputeSystemSteadyTimestampDelta() const noexcept override {
return stream_recorder_impl_->timestamp_delta();
}
Expand Down Expand Up @@ -126,6 +135,14 @@ class StreamRecorder : public ForkAwareRecorder, private Noncopyable {
std::atomic<int> pending_flush_counter_{0};
int64_t num_spans_consumed_{0};

std::mutex shutdown_mutex_;
std::condition_variable shutdown_condition_variable_;
std::atomic<int> shutdown_counter_{0};

// Used by polling to track when the satellite connections become inactive so
// that any threads waiting on shutdown can be notified.
bool last_is_active_{true};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a bit of trouble following what this variable does. Could you add a comment? Maybe add a comment on ShutdownWithTimeout to explain why it can fail even without a timeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment


std::unique_ptr<StreamRecorderImpl> stream_recorder_impl_;
};
} // namespace lightstep
2 changes: 1 addition & 1 deletion src/recorder/stream_recorder/stream_recorder_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void StreamRecorderImpl::Poll() noexcept {
Flush();
}

stream_recorder_.Poll();
stream_recorder_.Poll(*this);
}

//--------------------------------------------------------------------------------------------------
Expand Down
14 changes: 14 additions & 0 deletions src/recorder/stream_recorder/stream_recorder_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,22 @@ class StreamRecorderImpl : private Noncopyable {

~StreamRecorderImpl() noexcept;

/**
* A cached delta of difference between std::chrono::system_clock and
* std::chrono::steady_clock.
*/
int64_t timestamp_delta() const noexcept { return timestamp_delta_; }

/**
* Schedule the recorder to cleanly close all satellite connections.
*/
void InitiateShutdown() noexcept { streamer_.InitiateShutdown(); }

/**
* @return true if any satellite connections are active.
*/
bool is_active() const noexcept { return streamer_.is_active(); }

private:
StreamRecorder& stream_recorder_;

Expand Down
17 changes: 16 additions & 1 deletion src/tracer/tracer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,20 @@ bool TracerImpl::FlushWithTimeout(
//------------------------------------------------------------------------------
// Close
//------------------------------------------------------------------------------
void TracerImpl::Close() noexcept { Flush(); }
void TracerImpl::Close() noexcept {
auto t1 = std::chrono::steady_clock::now();
if (!recorder_->FlushWithTimeout(DefaultFlushTimeout)) {
return;
}
auto delta = std::chrono::steady_clock::now() - t1;
if (delta >= DefaultFlushTimeout) {
return;
}
auto timeout =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(
DefaultFlushTimeout) -
delta;
recorder_->ShutdownWithTimeout(
std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout));
}
} // namespace lightstep
37 changes: 37 additions & 0 deletions test/recorder/stream_recorder/stream_recorder_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,41 @@ TEST_CASE("StreamRecorder") {
stop = true;
generator.join();
}

SECTION(
"If a tracer is destroyed without shutting down first, it will close "
"satellite sockets before the http sessions are finished") {
std::this_thread::sleep_for(std::chrono::milliseconds{50});
tracer = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds{50});
}

SECTION("If a tracer is shut down, it waits for a response from the server") {
tracer->StartSpan("abc");
stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50});
stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50});
REQUIRE(logger_sink->contents().find("is readable") != std::string::npos);
}

SECTION("We can shut down a recorder twice") {
tracer->StartSpan("abc");
stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50});
stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50});
stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50});
REQUIRE(logger_sink->contents().find("is readable") != std::string::npos);
}

SECTION("Close will also shut down the recorder") {
tracer->StartSpan("abc");
tracer->Close();
REQUIRE(logger_sink->contents().find("is readable") != std::string::npos);
}

SECTION("Shutdown will return early if it times out") {
mock_satellite->SetRequestTimeout();
tracer->StartSpan("abc");
stream_recorder->FlushWithTimeout(std::chrono::milliseconds{50});
stream_recorder->ShutdownWithTimeout(std::chrono::milliseconds{50});
REQUIRE(logger_sink->contents().find("is readable") == std::string::npos);
}
}