Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 109 additions & 8 deletions src/nsolid/async_ts_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,26 @@
#include "../../deps/nsuv/include/nsuv-inl.h"
#include "asserts-cpp/asserts.h"

#include <memory>
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
#include <tuple>
#include <type_traits>

namespace node {
namespace nsolid {

/**
* Options for AsyncTSQueue batching notification
*/
struct AsyncTSQueueOptions {
uint64_t min_size = 0; // Minimum queue size to trigger notification
uint64_t max_time = 0; // Maximum time (ms) before notification
};


/**
* AsyncTSQueue is a templated class that provides an asynchronous thread-safe queue.
* It abstracts away the common functionality found all around N|Solid
Expand All @@ -35,10 +47,24 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
* @param callback The callback to process items
* @return A shared pointer to the initialized AsyncTSQueue
*/
// Factory method for options-based batching
template<typename Cb, typename... Args>
static SharedAsyncTSQueue create(uv_loop_t* loop,
const AsyncTSQueueOptions& opts,
Cb&& cb,
Args&&... args) {
SharedAsyncTSQueue queue(new AsyncTSQueue<T>(
loop, opts, std::forward<Cb>(cb), std::forward<Args>(args)...));
queue->initialize();
return queue;
}

// Factory method for legacy behavior
template<typename Cb, typename... Args>
static SharedAsyncTSQueue create(uv_loop_t* loop, Cb&& cb, Args&&... args) {
const AsyncTSQueueOptions opts;
SharedAsyncTSQueue queue(new AsyncTSQueue<T>(
loop, std::forward<Cb>(cb), std::forward<Args>(args)...));
loop, opts, std::forward<Cb>(cb), std::forward<Args>(args)...));
queue->initialize();
return queue;
}
Expand All @@ -48,6 +74,11 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
*/
~AsyncTSQueue() {
async_handle_->close_and_delete();
async_handle_ = nullptr;
if (timer_) {
timer_->close_and_delete();
timer_ = nullptr;
}
}

/**
Expand All @@ -69,12 +100,43 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
template<typename U>
size_t enqueue_impl(U&& item) {
size_t size = queue_.enqueue(std::forward<U>(item));
if (size == 1) {
ASSERT_EQ(0, async_handle_->send());
if (batching_enabled_) {
if (size == 1) {
// Arm the timer for the async callback
if (!timer_armed_.exchange(true, std::memory_order_release)) {
trigger_async();
}
} else if (size >= opts_.min_size) {
// Make sure we don't arm the timer if min size is reached, so we're
// items are consumed in the async callback.
timer_armed_.store(false, std::memory_order_release);
trigger_async();
}
} else {
if (size == 1) {
ASSERT_EQ(0, async_handle_->send());
}
}
return size;
}

// Timer management
void start_timer() {
ASSERT_NOT_NULL(timer_);
ASSERT_EQ(0, timer_->start(+[](nsuv::ns_timer*, WeakAsyncTSQueue queue_wp) {
SharedAsyncTSQueue queue = queue_wp.lock();
if (queue == nullptr) {
return;
}

queue->trigger_async();
}, opts_.max_time, 0, this->weak_from_this()));
}
Comment thread
santigimeno marked this conversation as resolved.
Comment thread
santigimeno marked this conversation as resolved.

Comment thread
santigimeno marked this conversation as resolved.
void trigger_async() {
ASSERT_EQ(0, async_handle_->send());
}

/**
* Process all items in the queue
*
Expand Down Expand Up @@ -107,8 +169,21 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
* callback logic at compile time.
*/
template<typename Cb, typename... Args>
AsyncTSQueue(uv_loop_t* loop, Cb&& cb, Args&&... args)
: loop_(loop), async_handle_(new nsuv::ns_async()) {
AsyncTSQueue(uv_loop_t* loop,
const AsyncTSQueueOptions& opts,
Cb&& cb,
Args&&... args)
: loop_(loop),
async_handle_(new nsuv::ns_async()),
opts_(opts),
batching_enabled_(opts.min_size > 0 && opts.max_time > 0),
timer_(nullptr) {
setup_callback(std::forward<Cb>(cb), std::forward<Args>(args)...);
}

// Setup callback and timer logic
template<typename Cb, typename... Args>
void setup_callback(Cb&& cb, Args&&... args) {
// Create a bound callback function
auto bound_cb = [cb = std::forward<Cb>(cb),
...args = std::forward<Args>(args)](auto&& first) mutable {
Expand Down Expand Up @@ -151,22 +226,48 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
// Initialize the async handle with a weak pointer to this object
WeakAsyncTSQueue weak_this = this->weak_from_this();
ASSERT_EQ(0, async_handle_->init(loop_, async_callback, weak_this));
if (batching_enabled_) {
timer_ = new nsuv::ns_timer();
ASSERT_EQ(0, timer_->init(loop_));
}
}

// Static callback function for nsuv
/**
* Note on batching behavior: This implementation ensures process() is called
* exactly once per batch. If new items are enqueued during process(), the timer
* is not automatically re-armed. The timer will only be re-armed when the queue
* becomes empty and then receives a new item (size == 1 path in enqueue_impl).
* This differs from level-triggered queues and may introduce a delay in processing
* new items if they arrive during an ongoing batch processing.
*/
static void async_callback(nsuv::ns_async* handle,
WeakAsyncTSQueue queue_wp) {
WeakAsyncTSQueue queue_wp) {
SharedAsyncTSQueue queue = queue_wp.lock();
if (queue == nullptr) {
return;
}
if (queue->batching_enabled_) {
// Only start timer if it was armed and queue is not empty
if (queue->timer_armed_.exchange(false, std::memory_order_acquire) &&
!queue->queue_.empty()) {
queue->start_timer();
return;
}
}

queue->process();
}

uv_loop_t* loop_;
nsuv::ns_async* async_handle_;
TSQueue<T> queue_;
ProcessCallback process_callback_;

// Batching options and timer
AsyncTSQueueOptions opts_;
const bool batching_enabled_ = false;
nsuv::ns_timer* timer_;
std::atomic<bool> timer_armed_{false};
};

} // namespace nsolid
Expand Down
Loading
Loading