diff --git a/src/nsolid/async_ts_queue.h b/src/nsolid/async_ts_queue.h index 108b11aa12..9f922c434c 100644 --- a/src/nsolid/async_ts_queue.h +++ b/src/nsolid/async_ts_queue.h @@ -7,14 +7,26 @@ #include "../../deps/nsuv/include/nsuv-inl.h" #include "asserts-cpp/asserts.h" -#include +#include +#include #include +#include +#include #include #include namespace node { namespace nsolid { +/** + * Options for AsyncTSQueue batching notification + */ +struct AsyncTSQueueOptions { + uint64_t min_size = 0; // Minimum queue size to trigger notification + uint64_t max_time = 0; // Maximum time (ms) before notification +}; + + /** * AsyncTSQueue is a templated class that provides an asynchronous thread-safe queue. * It abstracts away the common functionality found all around N|Solid @@ -35,10 +47,24 @@ class AsyncTSQueue : public std::enable_shared_from_this> { * @param callback The callback to process items * @return A shared pointer to the initialized AsyncTSQueue */ + // Factory method for options-based batching + template + static SharedAsyncTSQueue create(uv_loop_t* loop, + const AsyncTSQueueOptions& opts, + Cb&& cb, + Args&&... args) { + SharedAsyncTSQueue queue(new AsyncTSQueue( + loop, opts, std::forward(cb), std::forward(args)...)); + queue->initialize(); + return queue; + } + + // Factory method for legacy behavior template static SharedAsyncTSQueue create(uv_loop_t* loop, Cb&& cb, Args&&... args) { + const AsyncTSQueueOptions opts; SharedAsyncTSQueue queue(new AsyncTSQueue( - loop, std::forward(cb), std::forward(args)...)); + loop, opts, std::forward(cb), std::forward(args)...)); queue->initialize(); return queue; } @@ -48,6 +74,11 @@ class AsyncTSQueue : public std::enable_shared_from_this> { */ ~AsyncTSQueue() { async_handle_->close_and_delete(); + async_handle_ = nullptr; + if (timer_) { + timer_->close_and_delete(); + timer_ = nullptr; + } } /** @@ -69,12 +100,43 @@ class AsyncTSQueue : public std::enable_shared_from_this> { template size_t enqueue_impl(U&& item) { size_t size = queue_.enqueue(std::forward(item)); - if (size == 1) { - ASSERT_EQ(0, async_handle_->send()); + if (batching_enabled_) { + if (size == 1) { + // Arm the timer for the async callback + if (!timer_armed_.exchange(true, std::memory_order_release)) { + trigger_async(); + } + } else if (size >= opts_.min_size) { + // Make sure we don't arm the timer if min size is reached, so we're + // items are consumed in the async callback. + timer_armed_.store(false, std::memory_order_release); + trigger_async(); + } + } else { + if (size == 1) { + ASSERT_EQ(0, async_handle_->send()); + } } return size; } + // Timer management + void start_timer() { + ASSERT_NOT_NULL(timer_); + ASSERT_EQ(0, timer_->start(+[](nsuv::ns_timer*, WeakAsyncTSQueue queue_wp) { + SharedAsyncTSQueue queue = queue_wp.lock(); + if (queue == nullptr) { + return; + } + + queue->trigger_async(); + }, opts_.max_time, 0, this->weak_from_this())); + } + + void trigger_async() { + ASSERT_EQ(0, async_handle_->send()); + } + /** * Process all items in the queue * @@ -107,8 +169,21 @@ class AsyncTSQueue : public std::enable_shared_from_this> { * callback logic at compile time. */ template - AsyncTSQueue(uv_loop_t* loop, Cb&& cb, Args&&... args) - : loop_(loop), async_handle_(new nsuv::ns_async()) { + AsyncTSQueue(uv_loop_t* loop, + const AsyncTSQueueOptions& opts, + Cb&& cb, + Args&&... args) + : loop_(loop), + async_handle_(new nsuv::ns_async()), + opts_(opts), + batching_enabled_(opts.min_size > 0 && opts.max_time > 0), + timer_(nullptr) { + setup_callback(std::forward(cb), std::forward(args)...); + } + + // Setup callback and timer logic + template + void setup_callback(Cb&& cb, Args&&... args) { // Create a bound callback function auto bound_cb = [cb = std::forward(cb), ...args = std::forward(args)](auto&& first) mutable { @@ -151,15 +226,35 @@ class AsyncTSQueue : public std::enable_shared_from_this> { // Initialize the async handle with a weak pointer to this object WeakAsyncTSQueue weak_this = this->weak_from_this(); ASSERT_EQ(0, async_handle_->init(loop_, async_callback, weak_this)); + if (batching_enabled_) { + timer_ = new nsuv::ns_timer(); + ASSERT_EQ(0, timer_->init(loop_)); + } } - // Static callback function for nsuv + /** + * Note on batching behavior: This implementation ensures process() is called + * exactly once per batch. If new items are enqueued during process(), the timer + * is not automatically re-armed. The timer will only be re-armed when the queue + * becomes empty and then receives a new item (size == 1 path in enqueue_impl). + * This differs from level-triggered queues and may introduce a delay in processing + * new items if they arrive during an ongoing batch processing. + */ static void async_callback(nsuv::ns_async* handle, - WeakAsyncTSQueue queue_wp) { + WeakAsyncTSQueue queue_wp) { SharedAsyncTSQueue queue = queue_wp.lock(); if (queue == nullptr) { return; } + if (queue->batching_enabled_) { + // Only start timer if it was armed and queue is not empty + if (queue->timer_armed_.exchange(false, std::memory_order_acquire) && + !queue->queue_.empty()) { + queue->start_timer(); + return; + } + } + queue->process(); } @@ -167,6 +262,12 @@ class AsyncTSQueue : public std::enable_shared_from_this> { nsuv::ns_async* async_handle_; TSQueue queue_; ProcessCallback process_callback_; + + // Batching options and timer + AsyncTSQueueOptions opts_; + const bool batching_enabled_ = false; + nsuv::ns_timer* timer_; + std::atomic timer_armed_{false}; }; } // namespace nsolid diff --git a/test/cctest/test_nsolid_async_ts_queue.cc b/test/cctest/test_nsolid_async_ts_queue.cc index 5fc68cc559..6b45519c67 100644 --- a/test/cctest/test_nsolid_async_ts_queue.cc +++ b/test/cctest/test_nsolid_async_ts_queue.cc @@ -6,6 +6,17 @@ #include #include #include +// NOLINTNEXTLINE(build/c++11) +#include +// NOLINTNEXTLINE(build/c++11) +#include +// NOLINTNEXTLINE(build/c++11) +#include +// NOLINTNEXTLINE(build/c++11) +#include + +// NOLINTNEXTLINE(build/namespaces) +using namespace std::chrono_literals; using node::nsolid::AsyncTSQueue; using node::nsolid::TSQueue; @@ -14,31 +25,41 @@ using node::nsolid::TSQueue; class AsyncTSQueueTest : public ::testing::Test { protected: void SetUp() override { - loop_ = uv_default_loop(); - } + uv_loop_init(&loop_); + loop_thread_ = std::thread([&] { + uv_async_init(&loop_, &stop_handle_, [](uv_async_t* handle) { + uv_close(reinterpret_cast(handle), nullptr); + }); - void TearDown() override { - // Run the event loop to process any pending events - uv_run(loop_, UV_RUN_NOWAIT); + uv_run(&loop_, UV_RUN_DEFAULT); + }); } - // Helper function to run the event loop until all events are processed - void ProcessEvents() { - uv_run(loop_, UV_RUN_NOWAIT); + void TearDown() override { + // Stop the loop thread + uv_async_send(&stop_handle_); + loop_thread_.join(); + ASSERT_EQ(0, uv_loop_close(&loop_)); } - uv_loop_t* loop_; + uv_loop_t loop_; + std::thread loop_thread_; + uv_async_t stop_handle_; }; // Test basic queue operations TEST_F(AsyncTSQueueTest, BasicOperations) { std::vector processed_items; + std::condition_variable cv; + std::mutex mtx; // Create a queue with a callback that stores processed items auto queue = AsyncTSQueue::create( - loop_, - [&processed_items](int&& item) { + &loop_, + [&processed_items, &cv, &mtx](int&& item) { + std::lock_guard lock(mtx); processed_items.push_back(item); + cv.notify_one(); }); // Enqueue items @@ -46,8 +67,11 @@ TEST_F(AsyncTSQueueTest, BasicOperations) { queue->enqueue(2); queue->enqueue(3); - // Process the events (this will trigger the callback) - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 3; }); + } // Verify all items were processed in the correct order EXPECT_EQ(processed_items.size(), 3u); @@ -59,12 +83,16 @@ TEST_F(AsyncTSQueueTest, BasicOperations) { // Test enqueuing with different argument types TEST_F(AsyncTSQueueTest, EnqueueDifferentArgTypes) { std::vector processed_items; + std::condition_variable cv; + std::mutex mtx; // Create a queue with a callback that stores processed items auto queue = AsyncTSQueue::create( - loop_, - [&processed_items](std::string&& item) { + &loop_, + [&processed_items, &cv, &mtx](std::string&& item) { + std::lock_guard lock(mtx); processed_items.push_back(item); + cv.notify_one(); }); // Test copy enqueue @@ -74,8 +102,11 @@ TEST_F(AsyncTSQueueTest, EnqueueDifferentArgTypes) { // Test move enqueue queue->enqueue(std::string("test2")); - // Process the events - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify items were processed correctly EXPECT_EQ(processed_items.size(), 2u); @@ -83,25 +114,32 @@ TEST_F(AsyncTSQueueTest, EnqueueDifferentArgTypes) { EXPECT_EQ(processed_items[1], "test2"); } -// Test callback with additional arguments using reference captures +// // Test callback with additional arguments using reference captures TEST_F(AsyncTSQueueTest, CallbackWithReferenceCapture) { std::vector> processed_items; std::string prefix = "Item: "; + std::condition_variable cv; + std::mutex mtx; // Create a queue with a callback that takes additional arguments // Using a reference capture to ensure processed_items is properly updated auto queue = AsyncTSQueue::create( - loop_, - [&processed_items, &prefix](int&& item) { + &loop_, + [&processed_items, &prefix, &cv, &mtx](int&& item) { + std::lock_guard lock(mtx); processed_items.emplace_back(item, prefix + std::to_string(item)); + cv.notify_one(); }); // Enqueue items queue->enqueue(10); queue->enqueue(20); - // Process the events - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify items were processed with the additional arguments EXPECT_EQ(processed_items.size(), 2u); @@ -111,21 +149,25 @@ TEST_F(AsyncTSQueueTest, CallbackWithReferenceCapture) { EXPECT_EQ(processed_items[1].second, "Item: 20"); } -// Test callback with additional arguments using the Args&&... forwarding +// // Test callback with additional arguments using the Args&&... forwarding TEST_F(AsyncTSQueueTest, CallbackWithForwardedArgs) { std::vector> processed_items; std::string prefix = "Item: "; + std::condition_variable cv; + std::mutex mtx; // Create a callback function that takes the item and additional arguments - auto callback = [](int&& item, + auto callback = [&](int&& item, std::vector>& items, const std::string& prefix) { + std::lock_guard lock(mtx); items.emplace_back(item, prefix + std::to_string(item)); + cv.notify_one(); }; // Create a queue with a callback and forward additional arguments auto queue = AsyncTSQueue::create( - loop_, + &loop_, callback, // The callback function std::ref(processed_items), // Reference to vector std::cref(prefix)); // Const reference to string @@ -134,8 +176,11 @@ TEST_F(AsyncTSQueueTest, CallbackWithForwardedArgs) { queue->enqueue(30); queue->enqueue(40); - // Process the events - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify items were processed with the forwarded arguments EXPECT_EQ(processed_items.size(), 2u); @@ -145,7 +190,7 @@ TEST_F(AsyncTSQueueTest, CallbackWithForwardedArgs) { EXPECT_EQ(processed_items[1].second, "Item: 40"); } -// Test forwarding different argument types (value, ref, const ref) +// // Test forwarding different argument types (value, ref, const ref) TEST_F(AsyncTSQueueTest, ForwardingDifferentArgTypes) { struct Result { int value; @@ -156,22 +201,26 @@ TEST_F(AsyncTSQueueTest, ForwardingDifferentArgTypes) { std::vector results; std::string prefix = "Value: "; double multiplier = 2.5; + std::condition_variable cv; + std::mutex mtx; // Callback that uses all three types of arguments - auto callback = [](int&& item, - std::vector& results, // Reference - const std::string& prefix, // Const reference - double multiplier) { // Value + auto callback = [&](int&& item, + std::vector& results, // Reference + const std::string& prefix, // Const reference + double multiplier) { // Value + std::lock_guard lock(mtx); results.push_back({ item, prefix + std::to_string(item), item * multiplier }); + cv.notify_one(); }; // Create queue with the callback and various argument types auto queue = AsyncTSQueue::create( - loop_, + &loop_, callback, std::ref(results), // Reference std::cref(prefix), // Const reference @@ -181,8 +230,11 @@ TEST_F(AsyncTSQueueTest, ForwardingDifferentArgTypes) { queue->enqueue(5); queue->enqueue(10); - // Process the events - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return results.size() == 2; }); + } // Verify all argument types were correctly forwarded EXPECT_EQ(results.size(), 2u); @@ -198,23 +250,30 @@ TEST_F(AsyncTSQueueTest, ForwardingDifferentArgTypes) { EXPECT_DOUBLE_EQ(results[1].factor, 25.0); // 10 * 2.5 } -// Test multiple enqueue operations +// // Test multiple enqueue operations TEST_F(AsyncTSQueueTest, MultipleEnqueueOperations) { std::vector processed_items; + std::condition_variable cv; + std::mutex mtx; // Create a queue auto queue = AsyncTSQueue::create( - loop_, - [&processed_items](int&& item) { + &loop_, + [&processed_items, &cv, &mtx](int&& item) { + std::lock_guard lock(mtx); processed_items.push_back(item); + cv.notify_one(); }); // First batch of items queue->enqueue(1); queue->enqueue(2); - // Process events - ProcessEvents(); + // Wait for first batch to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify first batch was processed EXPECT_EQ(processed_items.size(), 2u); @@ -225,8 +284,11 @@ TEST_F(AsyncTSQueueTest, MultipleEnqueueOperations) { queue->enqueue(3); queue->enqueue(4); - // Process events again - ProcessEvents(); + // Wait for second batch to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 4; }); + } // Verify second batch was also processed EXPECT_EQ(processed_items.size(), 4u); @@ -234,60 +296,81 @@ TEST_F(AsyncTSQueueTest, MultipleEnqueueOperations) { EXPECT_EQ(processed_items[3], 4); } -// Test batch callback with std::vector&& +// // Test batch callback with std::vector&& TEST_F(AsyncTSQueueTest, BatchCallbackRvalueVector) { std::vector batch_processed; int call_count = 0; + std::condition_variable cv; + std::mutex mtx; auto queue = AsyncTSQueue::create( - loop_, - [&batch_processed, &call_count](std::vector&& batch) { + &loop_, + [&batch_processed, &call_count, &cv, &mtx](std::vector&& batch) { + std::lock_guard lock(mtx); ++call_count; batch_processed = std::move(batch); + cv.notify_one(); }); queue->enqueue(10); queue->enqueue(20); queue->enqueue(30); - ProcessEvents(); - EXPECT_EQ(call_count, 1); + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return call_count == 1; }); + } + ASSERT_EQ(batch_processed.size(), 3u); EXPECT_EQ(batch_processed[0], 10); EXPECT_EQ(batch_processed[1], 20); EXPECT_EQ(batch_processed[2], 30); } -// Test batch callback with extra argument +// // Test batch callback with extra argument TEST_F(AsyncTSQueueTest, BatchCallbackWithExtraArg) { std::vector batch_processed; std::string context = "CTX"; + std::condition_variable cv; + std::mutex mtx; auto queue = AsyncTSQueue::create( - loop_, - [&batch_processed](std::vector&& batch, - const std::string& ctx) { + &loop_, + [&batch_processed, &cv, &mtx](std::vector&& batch, + const std::string& ctx) { + std::lock_guard lock(mtx); for (auto& item : batch) batch_processed.push_back(ctx + ":" + item); + cv.notify_one(); }, std::cref(context)); queue->enqueue("a"); queue->enqueue("b"); queue->enqueue("c"); - ProcessEvents(); + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return batch_processed.size() == 3; }); + } ASSERT_EQ(batch_processed.size(), 3u); EXPECT_EQ(batch_processed[0], "CTX:a"); EXPECT_EQ(batch_processed[1], "CTX:b"); EXPECT_EQ(batch_processed[2], "CTX:c"); } -// Test batch callback with const std::vector& +// // Test batch callback with const std::vector& TEST_F(AsyncTSQueueTest, BatchCallbackConstVector) { std::vector batch_processed; + std::condition_variable cv; + std::mutex mtx; auto queue = AsyncTSQueue::create( - loop_, - [&batch_processed](const std::vector& batch) { + &loop_, + [&batch_processed, &cv, &mtx](const std::vector& batch) { + std::lock_guard lock(mtx); batch_processed = batch; + cv.notify_one(); }); queue->enqueue(5); queue->enqueue(7); queue->enqueue(9); - ProcessEvents(); + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return batch_processed.size() == 3; }); + } ASSERT_EQ(batch_processed.size(), 3u); EXPECT_EQ(batch_processed[0], 5); EXPECT_EQ(batch_processed[1], 7); @@ -306,20 +389,26 @@ struct TestData { TEST_F(AsyncTSQueueTest, ComplexDataType) { std::vector processed_items; + std::condition_variable cv; + std::mutex mtx; // Create a queue for the complex data type auto queue = AsyncTSQueue::create( - loop_, - [&processed_items](TestData&& item) { + &loop_, + [&processed_items, &cv, &mtx](TestData&& item) { + std::lock_guard lock(mtx); processed_items.push_back(std::move(item)); + cv.notify_one(); }); // Enqueue complex items queue->enqueue(TestData{1, "one"}); queue->enqueue(TestData{2, "two"}); - // Process events - ProcessEvents(); + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify complex items were processed correctly EXPECT_EQ(processed_items.size(), 2u); @@ -327,31 +416,26 @@ TEST_F(AsyncTSQueueTest, ComplexDataType) { EXPECT_EQ(processed_items[1], (TestData{2, "two"})); } -// Test that the get_loop method returns the correct loop -TEST_F(AsyncTSQueueTest, GetLoopMethod) { - auto queue = AsyncTSQueue::create( - loop_, - [](int&&) {}); - - // Process events - ProcessEvents(); -} - -// Test that arguments are properly forwarded to the callback +// // Test that arguments are properly forwarded to the callback TEST_F(AsyncTSQueueTest, ArgumentForwarding) { std::vector> processed_items; std::string prefix = "Item: "; + std::condition_variable cv; + std::mutex mtx; // Create a callback function that takes the item and additional arguments - auto callback = [](int item, - std::vector>& items, - const std::string& prefix) { + auto callback = [&cv, &mtx, &processed_items]( + int item, + std::vector>& items, + const std::string& prefix) { + std::lock_guard lock(mtx); items.emplace_back(item, prefix + std::to_string(item)); + cv.notify_one(); }; // Create a queue with a callback and forward additional arguments auto queue = AsyncTSQueue::create( - loop_, + &loop_, callback, // The callback function std::ref(processed_items), // Reference to vector std::cref(prefix)); // Const reference to string @@ -360,8 +444,11 @@ TEST_F(AsyncTSQueueTest, ArgumentForwarding) { queue->enqueue(50); queue->enqueue(60); - // Process the events - ProcessEvents(); + // Wait for all items to be processed + { + std::unique_lock lock(mtx); + cv.wait(lock, [&] { return processed_items.size() == 2; }); + } // Verify items were processed with the forwarded arguments EXPECT_EQ(processed_items.size(), 2u); @@ -370,3 +457,189 @@ TEST_F(AsyncTSQueueTest, ArgumentForwarding) { EXPECT_EQ(processed_items[1].first, 60); EXPECT_EQ(processed_items[1].second, "Item: 60"); } + +TEST_F(AsyncTSQueueTest, BatchingByMinSize) { + std::vector processed; + std::mutex mtx; + std::condition_variable cv; + int batch_count = 0; + // min_size=3, max_time=1s + const node::nsolid::AsyncTSQueueOptions opts{3, 1000}; + auto queue = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + batch_count++; + cv.notify_one(); + }); + queue->enqueue(1); + queue->enqueue(2); + // Should not trigger yet + { + std::unique_lock lock(mtx); + cv.wait_for(lock, 100ms, [&] { return processed.size() == 2; }); + } + + EXPECT_EQ(processed.size(), 0u); + + queue->enqueue(3); + // Should trigger batch + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 2000ms, [&] { return processed.size() == 3; }); + } + + EXPECT_EQ(processed.size(), 3u); + EXPECT_EQ(processed, (std::vector{1, 2, 3})); + EXPECT_EQ(batch_count, 1); +} + +TEST_F(AsyncTSQueueTest, BatchingByMaxTime) { + std::vector processed; + std::mutex mtx; + std::condition_variable cv; + // min_size=5, max_time=50ms + const node::nsolid::AsyncTSQueueOptions opts{5, 50}; + + auto queue = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + cv.notify_one(); + }); + + queue->enqueue(1); + queue->enqueue(2); + + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 200ms, [&] { return processed.size() == 2; }); + } + + EXPECT_EQ(processed, (std::vector{1, 2})); +} + +TEST_F(AsyncTSQueueTest, BatchingBothTriggers) { + std::vector processed; + std::mutex mtx; + std::condition_variable cv; + int batch_count = 0; + // min_size=3, max_time=100ms + const node::nsolid::AsyncTSQueueOptions opts{3, 100}; + auto queue = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + batch_count++; + cv.notify_one(); + }); + + // Enqueue slowly, timer should trigger + queue->enqueue(1); + std::this_thread::sleep_for(300ms); + queue->enqueue(2); + std::this_thread::sleep_for(300ms); + queue->enqueue(3); + + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 1000ms, [&] { return processed.size() == 3; }); + } + + EXPECT_EQ(processed.size(), 3u); + EXPECT_EQ(processed, (std::vector{1, 2, 3})); + EXPECT_EQ(batch_count, 3); + + // Now enqueue burst, should trigger by min_size + processed.clear(); + batch_count = 0; + auto queue2 = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + batch_count++; + cv.notify_one(); + }); + + queue2->enqueue(4); + queue2->enqueue(5); + queue2->enqueue(6); + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 500ms, [&] { return processed.size() == 3; }); + } + + EXPECT_EQ(processed.size(), 3u); + EXPECT_EQ(processed, (std::vector{4, 5, 6})); + EXPECT_EQ(batch_count, 1); +} + + +TEST_F(AsyncTSQueueTest, NoUnboundedTimerWakeupWhenEmpty) { + std::vector processed; + std::mutex mtx; + std::condition_variable cv; + int call_count = 0; + // min_size=2, max_time=30ms + const node::nsolid::AsyncTSQueueOptions opts{2, 30}; + auto queue = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + call_count++; + cv.notify_one(); + }); + + queue->enqueue(1); + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 500ms, [&] { return processed.size() == 1; }); + } + EXPECT_EQ(call_count, 1); + + // Wait another timer interval, should not call again + // (re-arm timer and run loop again to check for spurious wakeups) + processed.clear(); + call_count = 0; + + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 500ms, [&] { return call_count > 0; }); + } + + EXPECT_EQ(processed.size(), 0); + EXPECT_EQ(call_count, 0); +} + + +TEST_F(AsyncTSQueueTest, ThreadSafetyBatching) { + std::vector processed; + std::mutex mtx; + std::condition_variable cv; + const node::nsolid::AsyncTSQueueOptions opts{10, 500}; + auto queue = AsyncTSQueue::create(&loop_, + opts, + [&](std::vector&& batch) { + std::lock_guard lk(mtx); + processed.insert(processed.end(), batch.begin(), batch.end()); + cv.notify_one(); + }); + std::vector threads; + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&, i] { queue->enqueue(i); }); + } + for (auto& t : threads) t.join(); + // Wait for batch + { + std::unique_lock lk(mtx); + cv.wait_for(lk, 1000ms, [&] { return processed.size() == 10; }); + } + EXPECT_EQ(processed.size(), 10u); + std::sort(processed.begin(), processed.end()); + for (int i = 0; i < 10; ++i) EXPECT_EQ(processed[i], i); +}