From 6c9e18d876391bed052087afc819c58af3590129 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Thu, 13 Nov 2025 20:20:39 +0000 Subject: [PATCH 1/3] events/loop: dynamically increase queue size Update the event control loop implementation to dynamically grow the event queue when it reaches capacity. This change allows the loop to handle a larger number of events without blocking Enqueue, removing issues of circular dependencies in certain scenarios. Queues are now split into segments, each with the given fixed size. When the current segments are full, a new segment is allocated and linked to the end of the queue. This allows the queue to grow as needed. Uses `sync.Pool` to reduce allocations for segments. Signed-off-by: joshvanl --- events/loop/loop.go | 108 +++++++++++++++----- events/loop/loop_test.go | 206 +++++++++++++++++++++++++++++++++++++++ events/loop/segment.go | 55 +++++++++++ 3 files changed, 345 insertions(+), 24 deletions(-) create mode 100644 events/loop/loop_test.go create mode 100644 events/loop/segment.go diff --git a/events/loop/loop.go b/events/loop/loop.go index bbc3b9c8..1ce1aebb 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -30,20 +30,26 @@ type Interface[T any] interface { } type loop[T any] struct { - queue chan T + // linked list of channel segments + head *queueSegment[T] + tail *queueSegment[T] + + // capacity of each segment channel + segSize uint64 + handler Handler[T] closed bool closeCh chan struct{} - lock sync.RWMutex + + lock sync.RWMutex + + segPool sync.Pool } func New[T any](h Handler[T], size uint64) Interface[T] { - return &loop[T]{ - queue: make(chan T, size), - handler: h, - closeCh: make(chan struct{}), - } + l := new(loop[T]) + return l.Reset(h, size) } func Empty[T any]() Interface[T] { @@ -53,41 +59,89 @@ func Empty[T any]() Interface[T] { func (l *loop[T]) Run(ctx context.Context) error { defer close(l.closeCh) - for { - req, ok := <-l.queue - if !ok { - return nil + current := l.head + for current != nil { + // Drain this segment in order. The channel will be closed either: + // - when we "roll over" to a new segment, or + // - when Close() is called for the final segment. + for req := range current.ch { + if err := l.handler.Handle(ctx, req); err != nil { + return err + } } - if err := l.handler.Handle(ctx, req); err != nil { - return err - } + // Move to the next segment, and return this one to the pool. + next := current.next + l.putSegment(current) + current = next } + + return nil } func (l *loop[T]) Enqueue(req T) { - l.lock.RLock() - defer l.lock.RUnlock() + l.lock.Lock() + defer l.lock.Unlock() if l.closed { return } + // Ensure we have at least one segment. + if l.tail == nil { + seg := l.getSegment() + l.head = seg + l.tail = seg + } + + // First try to send to the current tail segment without blocking. select { - case l.queue <- req: - case <-l.closeCh: + case l.tail.ch <- req: + return + default: + // Tail is full: create a new segment, link it, close the old tail, + // and send into the new tail. + newSeg := l.getSegment() + l.tail.next = newSeg + close(l.tail.ch) + l.tail = newSeg + l.tail.ch <- req } } func (l *loop[T]) Close(req T) { l.lock.Lock() + if l.closed { + // Already closed; just unlock and wait for Run to finish. + l.lock.Unlock() + <-l.closeCh + return + } l.closed = true + + // Ensure at least one segment exists. + if l.tail == nil { + seg := l.getSegment() + l.head = seg + l.tail = seg + } + + // Enqueue the final request; if the tail is full, roll over as in Enqueue. select { - case l.queue <- req: - case <-l.closeCh: + case l.tail.ch <- req: + default: + newSeg := l.getSegment() + l.tail.next = newSeg + close(l.tail.ch) + l.tail = newSeg + l.tail.ch <- req } - close(l.queue) + + // No more items will be enqueued; close the tail to signal completion. + close(l.tail.ch) l.lock.Unlock() + + // Wait for Run to finish draining everything. <-l.closeCh } @@ -102,10 +156,16 @@ func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] { l.closed = false l.closeCh = make(chan struct{}) l.handler = h + l.segSize = size + + // Initialize pool for this instantiation of T. + l.segPool.New = func() any { + return new(queueSegment[T]) + } - // TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and - // improve performance. - l.queue = make(chan T, size) + seg := l.getSegment() + l.head = seg + l.tail = seg return l } diff --git a/events/loop/loop_test.go b/events/loop/loop_test.go new file mode 100644 index 00000000..accd4dc8 --- /dev/null +++ b/events/loop/loop_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// testHandler is a simple handler that records all values it sees. +type testHandler[T any] struct { + mu sync.Mutex + seen []T + err error + delay time.Duration +} + +func (h *testHandler[T]) Handle(ctx context.Context, v T) error { + if h.delay > 0 { + select { + case <-time.After(h.delay): + case <-ctx.Done(): + return ctx.Err() + } + } + + h.mu.Lock() + defer h.mu.Unlock() + h.seen = append(h.seen, v) + return h.err +} + +func (h *testHandler[T]) Values() []T { + h.mu.Lock() + defer h.mu.Unlock() + out := make([]T, len(h.seen)) + copy(out, h.seen) + return out +} + +func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + const segmentSize = 4 + + l := New[int](h, segmentSize) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := l.Run(ctx) + require.NoError(t, err, "Run should finish without error") + }() + + // Enqueue more items than a single segment to force multiple channels. + const n = 25 + for i := 0; i < n; i++ { + l.Enqueue(i) + } + + // Close with a sentinel value so we can verify it is the last element. + const final = 999 + l.Close(final) + + wg.Wait() + + got := h.Values() + require.Len(t, got, n+1, "handler should see all enqueued items plus final close item") + + // First n values should be 0..n-1 in order. + for i := 0; i < n; i++ { + assert.Equal(t, i, got[i], "item at index %d out of order", i) + } + + // Last one is the final close value. + assert.Equal(t, final, got[len(got)-1], "last item should be the Close() value") +} + +func TestLoop_CloseTwiceIsSafe(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + l := New[int](h, 2) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := l.Run(ctx) + require.NoError(t, err) + }() + + l.Enqueue(1) + l.Close(2) + + // Second close should not panic or deadlock. + done := make(chan struct{}) + go func() { + l.Close(3) + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + require.Fail(t, "second Close should not block") + } + + wg.Wait() + + got := h.Values() + // First close enqueues 2, second close should be ignored. + assert.Contains(t, got, 1) + assert.Contains(t, got, 2) + assert.NotContains(t, got, 3) +} + +func TestLoop_Reset(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h1 := &testHandler[int]{} + l := New[int](h1, 2) + + var wg1 sync.WaitGroup + wg1.Add(1) + go func() { + defer wg1.Done() + err := l.Run(ctx) + require.NoError(t, err) + }() + + l.Enqueue(1) + l.Close(2) + wg1.Wait() + + assert.ElementsMatch(t, []int{1, 2}, h1.Values()) + + // Reset to a new handler and buffer size. + h2 := &testHandler[int]{} + l = l.Reset(h2, 8) + + require.NotNil(t, l) + + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + err := l.Run(ctx) + require.NoError(t, err) + }() + + l.Enqueue(10) + l.Enqueue(11) + l.Close(12) + wg2.Wait() + + assert.ElementsMatch(t, []int{10, 11, 12}, h2.Values()) +} + +func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + l := New[int](h, 2) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := l.Run(ctx) + require.NoError(t, err) + }() + + l.Enqueue(1) + l.Close(2) + + // This enqueue should be ignored. + l.Enqueue(3) + + wg.Wait() + + got := h.Values() + assert.Equal(t, []int{1, 2}, got) +} diff --git a/events/loop/segment.go b/events/loop/segment.go new file mode 100644 index 00000000..c8d8c5d0 --- /dev/null +++ b/events/loop/segment.go @@ -0,0 +1,55 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +// queueSegment is a segment in a linked list of buffered channels. +// We always write to the tail segment. When the tail segment is full, +// we create a new tail segment and close the old one so that Run can +// move on once it has drained it. +type queueSegment[T any] struct { + ch chan T + next *queueSegment[T] +} + +// getSegment gets a queueSegment from the pool or allocates a new one. +// It always initializes a fresh channel of the configured size. +func (l *loop[T]) getSegment() *queueSegment[T] { + // Ensure segPool.New is initialized. + if l.segPool.New == nil { + l.segPool.New = func() any { + return new(queueSegment[T]) + } + } + + seg := l.segPool.Get().(*queueSegment[T]) + seg.next = nil + + segSize := l.segSize + if segSize == 0 { + segSize = 1 + } + seg.ch = make(chan T, segSize) + + return seg +} + +// putSegment returns a segment to the pool after clearing references. +func (l *loop[T]) putSegment(seg *queueSegment[T]) { + if seg == nil { + return + } + seg.ch = nil + seg.next = nil + l.segPool.Put(seg) +} From b5f2ffebf7cd067ffa5543dd7a40a1de498e5465 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Thu, 13 Nov 2025 20:34:11 +0000 Subject: [PATCH 2/3] lint Signed-off-by: joshvanl --- events/loop/loop.go | 7 ++----- events/loop/loop_test.go | 39 +++++++++++++++++++++++++++------------ events/loop/segment.go | 1 - 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/events/loop/loop.go b/events/loop/loop.go index 1ce1aebb..db988304 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -30,11 +30,9 @@ type Interface[T any] interface { } type loop[T any] struct { - // linked list of channel segments head *queueSegment[T] tail *queueSegment[T] - // capacity of each segment channel segSize uint64 handler Handler[T] @@ -87,7 +85,6 @@ func (l *loop[T]) Enqueue(req T) { return } - // Ensure we have at least one segment. if l.tail == nil { seg := l.getSegment() l.head = seg @@ -99,8 +96,8 @@ func (l *loop[T]) Enqueue(req T) { case l.tail.ch <- req: return default: - // Tail is full: create a new segment, link it, close the old tail, - // and send into the new tail. + // Tail is full: create a new segment, link it, close the old tail, and + // send into the new tail. newSeg := l.getSegment() l.tail.next = newSeg close(l.tail.ch) diff --git a/events/loop/loop_test.go b/events/loop/loop_test.go index accd4dc8..ade3f336 100644 --- a/events/loop/loop_test.go +++ b/events/loop/loop_test.go @@ -65,15 +65,18 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { var wg sync.WaitGroup wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) go func() { defer wg.Done() - err := l.Run(ctx) - require.NoError(t, err, "Run should finish without error") + errCh <- l.Run(ctx) }() // Enqueue more items than a single segment to force multiple channels. const n = 25 - for i := 0; i < n; i++ { + for i := range n { l.Enqueue(i) } @@ -87,7 +90,7 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { require.Len(t, got, n+1, "handler should see all enqueued items plus final close item") // First n values should be 0..n-1 in order. - for i := 0; i < n; i++ { + for i := range n { assert.Equal(t, i, got[i], "item at index %d out of order", i) } @@ -104,10 +107,13 @@ func TestLoop_CloseTwiceIsSafe(t *testing.T) { var wg sync.WaitGroup wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) go func() { defer wg.Done() - err := l.Run(ctx) - require.NoError(t, err) + errCh <- l.Run(ctx) }() l.Enqueue(1) @@ -144,10 +150,13 @@ func TestLoop_Reset(t *testing.T) { var wg1 sync.WaitGroup wg1.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) go func() { defer wg1.Done() - err := l.Run(ctx) - require.NoError(t, err) + errCh <- l.Run(ctx) }() l.Enqueue(1) @@ -164,10 +173,13 @@ func TestLoop_Reset(t *testing.T) { var wg2 sync.WaitGroup wg2.Add(1) + errCh2 := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh2) + }) go func() { defer wg2.Done() - err := l.Run(ctx) - require.NoError(t, err) + errCh2 <- l.Run(ctx) }() l.Enqueue(10) @@ -187,10 +199,13 @@ func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) { var wg sync.WaitGroup wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) go func() { defer wg.Done() - err := l.Run(ctx) - require.NoError(t, err) + errCh <- l.Run(ctx) }() l.Enqueue(1) diff --git a/events/loop/segment.go b/events/loop/segment.go index c8d8c5d0..011ea99b 100644 --- a/events/loop/segment.go +++ b/events/loop/segment.go @@ -25,7 +25,6 @@ type queueSegment[T any] struct { // getSegment gets a queueSegment from the pool or allocates a new one. // It always initializes a fresh channel of the configured size. func (l *loop[T]) getSegment() *queueSegment[T] { - // Ensure segPool.New is initialized. if l.segPool.New == nil { l.segPool.New = func() any { return new(queueSegment[T]) From 85137a98f690347ece201371802786306ff2e3ee Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 14 Nov 2025 09:48:46 +0000 Subject: [PATCH 3/3] Use `sync.Mutex` instead of `sync.RWMutex` for loop mutexes Signed-off-by: joshvanl --- events/loop/loop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/loop/loop.go b/events/loop/loop.go index db988304..cdc2e07b 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -40,7 +40,7 @@ type loop[T any] struct { closed bool closeCh chan struct{} - lock sync.RWMutex + lock sync.Mutex segPool sync.Pool }