From 9cc7a5dd2dd26bf1d54314d893b0530598903f87 Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Thu, 29 May 2025 15:06:26 +0200 Subject: [PATCH] src: add batched support callback in AsyncTSQueue Enable AsyncTSQueue to accept both single-element (T&&, const T&) and batch (std::vector&&, const std::vector&) callbacks, using if constexpr with type traits for compile-time dispatch. Ensure correct argument forwarding and update tests to cover all supported callback signatures. --- src/nsolid/async_ts_queue.h | 82 +++++++++++++++-------- test/cctest/test_nsolid_async_ts_queue.cc | 60 +++++++++++++++++ 2 files changed, 113 insertions(+), 29 deletions(-) diff --git a/src/nsolid/async_ts_queue.h b/src/nsolid/async_ts_queue.h index 969b8a7b76..61fda545e9 100644 --- a/src/nsolid/async_ts_queue.h +++ b/src/nsolid/async_ts_queue.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace node { namespace nsolid { @@ -26,7 +27,6 @@ class AsyncTSQueue : public std::enable_shared_from_this> { public: using SharedAsyncTSQueue = std::shared_ptr>; using WeakAsyncTSQueue = std::weak_ptr>; - using ProcessCallback = std::function; /** * Factory method to create and initialize an AsyncTSQueue @@ -37,11 +37,8 @@ class AsyncTSQueue : public std::enable_shared_from_this> { */ template static SharedAsyncTSQueue create(uv_loop_t* loop, Cb&& cb, Args&&... args) { - // Create a shared_ptr with the private constructor SharedAsyncTSQueue queue(new AsyncTSQueue( loop, std::forward(cb), std::forward(args)...)); - - // Initialize the queue and return it queue->initialize(); return queue; } @@ -79,33 +76,70 @@ class AsyncTSQueue : public std::enable_shared_from_this> { /** * Process all items in the queue + * + * Calls the appropriate callback based on the callback type (single or batch) + * determined at compile time using if constexpr. */ void process() { - process_single_items(); + process_callback_(); } private: + // Callback support for both single-item and batch processing + using ProcessCallback = std::function; + // --- Type traits for Callback Type Detection --- + template + using is_batch_callback = std::disjunction< + std::is_invocable&&, Extra...>, + std::is_invocable&, Extra...> + >; + template + using is_single_callback = std::disjunction< + std::is_invocable, + std::is_invocable + >; + /** * Constructor for AsyncTSQueue * - * @param loop The UV loop to use for async notifications - * @param callback The callback to process items + * Uses if constexpr with type traits to select between single-item and batch + * callback logic at compile time. */ template AsyncTSQueue(uv_loop_t* loop, Cb&& cb, Args&&... args) - : loop_(loop), - async_handle_(new nsuv::ns_async()) { - // Create a lambda that captures the callback and arguments by value - // and forwards them when called - process_callback_ = [cb = std::forward(cb), - args_tuple = std::make_tuple( - std::forward(args)...)] - (T&& item) mutable { - // Apply the callback with the item and stored arguments - std::apply([&cb, &item](auto&&... args) { - cb(std::forward(item), std::forward(args)...); - }, args_tuple); + : loop_(loop), async_handle_(new nsuv::ns_async()) { + // Create a bound callback function + auto bound_cb = [cb = std::forward(cb), + ...args = std::forward(args)](auto&& first) mutable { + std::invoke(cb, std::forward(first), args...); }; + if constexpr (is_batch_callback::value) { + // Batch callback: process all items at once + process_callback_ = [this, bound_cb = bound_cb]() mutable { + T item; + size_t size = queue_.dequeue(item); + if (size > 0) { + std::vector batch; + batch.reserve(size + 1); + batch.push_back(std::move(item)); + while (queue_.dequeue(item)) { + batch.push_back(std::move(item)); + } + bound_cb(std::move(batch)); + } + }; + } else if constexpr (is_single_callback::value) { + process_callback_ = [this, bound_cb = bound_cb]() mutable { + T item; + while (queue_.dequeue(item)) { + bound_cb(std::move(item)); + } + }; + } else { + static_assert(is_batch_callback::value || + is_single_callback::value, + "AsyncTSQueue callback signature not supported"); + } } /** @@ -128,16 +162,6 @@ class AsyncTSQueue : public std::enable_shared_from_this> { queue->process(); } - /** - * Process items one by one using the process_callback_ - */ - void process_single_items() { - T item; - while (queue_.dequeue(item)) { - process_callback_(std::move(item)); - } - } - uv_loop_t* loop_; nsuv::ns_async* async_handle_; TSQueue queue_; diff --git a/test/cctest/test_nsolid_async_ts_queue.cc b/test/cctest/test_nsolid_async_ts_queue.cc index 8110f97289..5fc68cc559 100644 --- a/test/cctest/test_nsolid_async_ts_queue.cc +++ b/test/cctest/test_nsolid_async_ts_queue.cc @@ -234,6 +234,66 @@ TEST_F(AsyncTSQueueTest, MultipleEnqueueOperations) { EXPECT_EQ(processed_items[3], 4); } +// Test batch callback with std::vector&& +TEST_F(AsyncTSQueueTest, BatchCallbackRvalueVector) { + std::vector batch_processed; + int call_count = 0; + auto queue = AsyncTSQueue::create( + loop_, + [&batch_processed, &call_count](std::vector&& batch) { + ++call_count; + batch_processed = std::move(batch); + }); + queue->enqueue(10); + queue->enqueue(20); + queue->enqueue(30); + ProcessEvents(); + EXPECT_EQ(call_count, 1); + ASSERT_EQ(batch_processed.size(), 3u); + EXPECT_EQ(batch_processed[0], 10); + EXPECT_EQ(batch_processed[1], 20); + EXPECT_EQ(batch_processed[2], 30); +} + +// Test batch callback with extra argument +TEST_F(AsyncTSQueueTest, BatchCallbackWithExtraArg) { + std::vector batch_processed; + std::string context = "CTX"; + auto queue = AsyncTSQueue::create( + loop_, + [&batch_processed](std::vector&& batch, + const std::string& ctx) { + for (auto& item : batch) batch_processed.push_back(ctx + ":" + item); + }, + std::cref(context)); + queue->enqueue("a"); + queue->enqueue("b"); + queue->enqueue("c"); + ProcessEvents(); + ASSERT_EQ(batch_processed.size(), 3u); + EXPECT_EQ(batch_processed[0], "CTX:a"); + EXPECT_EQ(batch_processed[1], "CTX:b"); + EXPECT_EQ(batch_processed[2], "CTX:c"); +} + +// Test batch callback with const std::vector& +TEST_F(AsyncTSQueueTest, BatchCallbackConstVector) { + std::vector batch_processed; + auto queue = AsyncTSQueue::create( + loop_, + [&batch_processed](const std::vector& batch) { + batch_processed = batch; + }); + queue->enqueue(5); + queue->enqueue(7); + queue->enqueue(9); + ProcessEvents(); + ASSERT_EQ(batch_processed.size(), 3u); + EXPECT_EQ(batch_processed[0], 5); + EXPECT_EQ(batch_processed[1], 7); + EXPECT_EQ(batch_processed[2], 9); +} + // Test with a complex data type struct TestData { int id;