From 8c954e391f16c53070e9474d6e2077f3177578b6 Mon Sep 17 00:00:00 2001 From: Everett Badeaux Date: Thu, 15 May 2025 14:22:56 -0500 Subject: [PATCH 1/2] Buffer Fix --- spectator/publisher.cc | 83 +++++++++++++++++++++++++++---------- spectator/publisher.h | 21 +++++++++- spectator/publisher_test.cc | 2 +- 3 files changed, 82 insertions(+), 24 deletions(-) diff --git a/spectator/publisher.cc b/spectator/publisher.cc index ba25bcb..fd34c7c 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -14,7 +14,8 @@ SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint, local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) { buffer_.reserve(bytes_to_buffer_ + 1024); if (absl::StartsWith(endpoint, "unix:")) { - setup_unix_domain(endpoint.substr(5)); + this->unixDomainPath_ = std::string(endpoint.substr(5)); + setup_unix_domain(); } else if (absl::StartsWith(endpoint, "udp:")) { auto pos = 4; // if the user used udp://foo:1234 instead of udp:foo:1234 @@ -49,29 +50,67 @@ void SpectatordPublisher::local_reconnect(absl::string_view path) { } } -void SpectatordPublisher::setup_unix_domain(absl::string_view path) { - local_reconnect(path); - // get a copy of the file path - std::string local_path{path}; - sender_ = [local_path, this](std::string_view msg) { - buffer_.append(msg); - if (buffer_.length() >= bytes_to_buffer_) { - for (auto i = 0; i < 3; ++i) { - try { - auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); - logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); - break; - } catch (std::exception& e) { - local_reconnect(local_path); - logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i, - e.what()); + +bool SpectatordPublisher::try_to_send(const std::string& buffer) { + for (auto i = 0; i < 3; ++i) { + try { + auto sent_bytes = local_socket_.send(asio::buffer(buffer)); + logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, + buffer.length()); + return true; + } catch (std::exception& e) { + local_reconnect(this->unixDomainPath_); + logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer, i, e.what()); + } + } + return false; +} + +void SpectatordPublisher::taskThreadFunction() try { + while (shutdown_.load() == false) { + std::string message{}; + { + std::unique_lock lock(mtx_); + cv_sender_.wait(lock, [this] { return buffer_.size() > bytes_to_buffer_ || shutdown_.load();}); + if (shutdown_.load() == true) { + return; + } + message = std::move(buffer_); + buffer_ = std::string(); + } + cv_receiver_.notify_one(); + try_to_send(message); + } +} catch (const std::exception& e) { + logger_->error("Fatal error in message processing thread: {}", e.what()); +} + +void SpectatordPublisher::setup_unix_domain(){ + // Reset connection to the unix domain socket + local_reconnect(this->unixDomainPath_); + if (bytes_to_buffer_ == 0) { + sender_ = [this](std::string_view msg) { + try_to_send(std::string(msg)); + }; + return; + } + else{ + sender_ = [this](std::string_view msg) { + unsigned int currentBufferSize = buffer_.size(); + { + std::unique_lock lock(mtx_); + cv_receiver_.wait(lock, [this] { return buffer_.size() <= bytes_to_buffer_ || shutdown_.load(); }); + if (shutdown_.load()) { + return; } + buffer_.append(msg.data(), msg.size()); + buffer_.append(1, NEW_LINE); + currentBufferSize = buffer_.size(); } - buffer_.clear(); - } else { - buffer_.push_back(NEW_LINE); - } - }; + currentBufferSize > bytes_to_buffer_ ? cv_sender_.notify_one() : cv_receiver_.notify_one(); + }; + this->sendingThread_ = std::thread(&SpectatordPublisher::taskThreadFunction, this); + } } inline asio::ip::udp::endpoint resolve_host_port( diff --git a/spectator/publisher.h b/spectator/publisher.h index 0548137..56e37d7 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -15,15 +15,27 @@ class SpectatordPublisher { std::shared_ptr logger = DefaultLogger()); SpectatordPublisher(const SpectatordPublisher&) = delete; + ~SpectatordPublisher() { + shutdown_.store(true); + cv_receiver_.notify_all(); + cv_sender_.notify_all(); + if (sendingThread_.joinable()) { + sendingThread_.join(); + } + } + void send(std::string_view measurement) { sender_(measurement); }; + void taskThreadFunction(); + bool try_to_send(const std::string& buffer); + protected: using sender_fun = std::function; sender_fun sender_; private: void setup_nop_sender(); - void setup_unix_domain(absl::string_view path); + void setup_unix_domain(); void setup_udp(absl::string_view host_port); void local_reconnect(absl::string_view path); void udp_reconnect(const asio::ip::udp::endpoint& endpoint); @@ -34,6 +46,13 @@ class SpectatordPublisher { asio::local::datagram_protocol::socket local_socket_; std::string buffer_; uint32_t bytes_to_buffer_; + + std::thread sendingThread_; + std::mutex mtx_; + std::condition_variable cv_receiver_; + std::condition_variable cv_sender_; + std::string unixDomainPath_; + std::atomic shutdown_{false}; }; } // namespace spectator diff --git a/spectator/publisher_test.cc b/spectator/publisher_test.cc index 84f817b..015dbdf 100644 --- a/spectator/publisher_test.cc +++ b/spectator/publisher_test.cc @@ -77,7 +77,7 @@ TEST(Publisher, UnixBuffer) { c.Increment(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); msgs = server.GetMessages(); - std::vector expected{"c:counter:1\nc:counter:1\nc:counter:1"}; + std::vector expected{"c:counter:1\nc:counter:1\nc:counter:1\n"}; EXPECT_EQ(msgs, expected); server.Stop(); unlink(path.c_str()); From 9ea59fb617c0ae77764976a110f76a44a8c03a20 Mon Sep 17 00:00:00 2001 From: Netflix Privileged User Date: Fri, 16 May 2025 20:13:13 +0000 Subject: [PATCH 2/2] Add multithread test --- spectator/publisher.cc | 5 ++- spectator/publisher_test.cc | 73 +++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/spectator/publisher.cc b/spectator/publisher.cc index fd34c7c..9658d58 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -68,7 +68,7 @@ bool SpectatordPublisher::try_to_send(const std::string& buffer) { void SpectatordPublisher::taskThreadFunction() try { while (shutdown_.load() == false) { - std::string message{}; + std::string message {}; { std::unique_lock lock(mtx_); cv_sender_.wait(lock, [this] { return buffer_.size() > bytes_to_buffer_ || shutdown_.load();}); @@ -77,6 +77,7 @@ void SpectatordPublisher::taskThreadFunction() try { } message = std::move(buffer_); buffer_ = std::string(); + buffer_.reserve(bytes_to_buffer_); } cv_receiver_.notify_one(); try_to_send(message); @@ -94,7 +95,7 @@ void SpectatordPublisher::setup_unix_domain(){ }; return; } - else{ + else { sender_ = [this](std::string_view msg) { unsigned int currentBufferSize = buffer_.size(); { diff --git a/spectator/publisher_test.cc b/spectator/publisher_test.cc index 015dbdf..0358687 100644 --- a/spectator/publisher_test.cc +++ b/spectator/publisher_test.cc @@ -5,6 +5,7 @@ #include "test_server.h" #include #include +#include namespace { @@ -90,4 +91,76 @@ TEST(Publisher, Nop) { c.Add(2); } +TEST(Publisher, MultiThreadedCounters) { + auto logger = spectator::DefaultLogger(); + const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp"); + auto path = fmt::format("{}/testserver.{}", dir, getpid()); + TestUnixServer server{path}; + server.Start(); + logger->info("Unix Server started on path {}", path); + + // Create publisher with a small buffer size to ensure flushing + SpectatordPublisher publisher{fmt::format("unix:{}", path), 50}; + + // Number of threads and counters to create + const int numThreads = 4; + const int countersPerThread = 3; + const int incrementsPerCounter = 5; + + // Function for worker threads + auto worker = [&](int threadId) { + // Create several counters per thread with unique names + for (int i = 0; i < countersPerThread; i++) { + std::string counterName = fmt::format("counter.thread{}.{}", threadId, i); + Counter counter(std::make_shared(counterName, Tags{}), &publisher); + + // Increment each counter multiple times + for (int j = 0; j < incrementsPerCounter; j++) { + counter.Increment(); + } + } + }; + + // Start worker threads + std::vector threads; + for (int i = 0; i < numThreads; i++) { + threads.emplace_back(worker, i); + } + + // Wait for all threads to complete + for (auto& t : threads) { + t.join(); + } + + // Give some time for messages to be sent + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Check messages + auto msgs = server.GetMessages(); + EXPECT_FALSE(msgs.empty()); + + // Verify total number of increments + int expectedIncrements = numThreads * countersPerThread * incrementsPerCounter; + int actualIncrements = 0; + + // Verify every string in msgs follows the form counter.thread. + std::regex counter_regex(R"(c:counter\.thread\d+\.\d+:1)"); + for (const auto& msg : msgs) { + std::stringstream ss(msg); + std::string line; + while (std::getline(ss, line)) { + if (!line.empty()) { + EXPECT_TRUE(std::regex_match(line, counter_regex)) + << "Unexpected counter format: " << line; + actualIncrements++; + } + } + } + + EXPECT_EQ(actualIncrements, expectedIncrements); + + server.Stop(); + unlink(path.c_str()); +} + } // namespace