diff --git a/events/loop/loop.go b/events/loop/loop.go index bbc3b9c8..cdc2e07b 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -30,20 +30,24 @@ type Interface[T any] interface { } type loop[T any] struct { - queue chan T + head *queueSegment[T] + tail *queueSegment[T] + + segSize uint64 + handler Handler[T] closed bool closeCh chan struct{} - lock sync.RWMutex + + lock sync.Mutex + + segPool sync.Pool } func New[T any](h Handler[T], size uint64) Interface[T] { - return &loop[T]{ - queue: make(chan T, size), - handler: h, - closeCh: make(chan struct{}), - } + l := new(loop[T]) + return l.Reset(h, size) } func Empty[T any]() Interface[T] { @@ -53,41 +57,88 @@ func Empty[T any]() Interface[T] { func (l *loop[T]) Run(ctx context.Context) error { defer close(l.closeCh) - for { - req, ok := <-l.queue - if !ok { - return nil + current := l.head + for current != nil { + // Drain this segment in order. The channel will be closed either: + // - when we "roll over" to a new segment, or + // - when Close() is called for the final segment. + for req := range current.ch { + if err := l.handler.Handle(ctx, req); err != nil { + return err + } } - if err := l.handler.Handle(ctx, req); err != nil { - return err - } + // Move to the next segment, and return this one to the pool. + next := current.next + l.putSegment(current) + current = next } + + return nil } func (l *loop[T]) Enqueue(req T) { - l.lock.RLock() - defer l.lock.RUnlock() + l.lock.Lock() + defer l.lock.Unlock() if l.closed { return } + if l.tail == nil { + seg := l.getSegment() + l.head = seg + l.tail = seg + } + + // First try to send to the current tail segment without blocking. select { - case l.queue <- req: - case <-l.closeCh: + case l.tail.ch <- req: + return + default: + // Tail is full: create a new segment, link it, close the old tail, and + // send into the new tail. + newSeg := l.getSegment() + l.tail.next = newSeg + close(l.tail.ch) + l.tail = newSeg + l.tail.ch <- req } } func (l *loop[T]) Close(req T) { l.lock.Lock() + if l.closed { + // Already closed; just unlock and wait for Run to finish. + l.lock.Unlock() + <-l.closeCh + return + } l.closed = true + + // Ensure at least one segment exists. + if l.tail == nil { + seg := l.getSegment() + l.head = seg + l.tail = seg + } + + // Enqueue the final request; if the tail is full, roll over as in Enqueue. select { - case l.queue <- req: - case <-l.closeCh: + case l.tail.ch <- req: + default: + newSeg := l.getSegment() + l.tail.next = newSeg + close(l.tail.ch) + l.tail = newSeg + l.tail.ch <- req } - close(l.queue) + + // No more items will be enqueued; close the tail to signal completion. + close(l.tail.ch) l.lock.Unlock() + + // Wait for Run to finish draining everything. <-l.closeCh } @@ -102,10 +153,16 @@ func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] { l.closed = false l.closeCh = make(chan struct{}) l.handler = h + l.segSize = size + + // Initialize pool for this instantiation of T. + l.segPool.New = func() any { + return new(queueSegment[T]) + } - // TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and - // improve performance. - l.queue = make(chan T, size) + seg := l.getSegment() + l.head = seg + l.tail = seg return l } diff --git a/events/loop/loop_test.go b/events/loop/loop_test.go new file mode 100644 index 00000000..ade3f336 --- /dev/null +++ b/events/loop/loop_test.go @@ -0,0 +1,221 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// testHandler is a simple handler that records all values it sees. +type testHandler[T any] struct { + mu sync.Mutex + seen []T + err error + delay time.Duration +} + +func (h *testHandler[T]) Handle(ctx context.Context, v T) error { + if h.delay > 0 { + select { + case <-time.After(h.delay): + case <-ctx.Done(): + return ctx.Err() + } + } + + h.mu.Lock() + defer h.mu.Unlock() + h.seen = append(h.seen, v) + return h.err +} + +func (h *testHandler[T]) Values() []T { + h.mu.Lock() + defer h.mu.Unlock() + out := make([]T, len(h.seen)) + copy(out, h.seen) + return out +} + +func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + const segmentSize = 4 + + l := New[int](h, segmentSize) + + var wg sync.WaitGroup + wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) + go func() { + defer wg.Done() + errCh <- l.Run(ctx) + }() + + // Enqueue more items than a single segment to force multiple channels. + const n = 25 + for i := range n { + l.Enqueue(i) + } + + // Close with a sentinel value so we can verify it is the last element. + const final = 999 + l.Close(final) + + wg.Wait() + + got := h.Values() + require.Len(t, got, n+1, "handler should see all enqueued items plus final close item") + + // First n values should be 0..n-1 in order. + for i := range n { + assert.Equal(t, i, got[i], "item at index %d out of order", i) + } + + // Last one is the final close value. + assert.Equal(t, final, got[len(got)-1], "last item should be the Close() value") +} + +func TestLoop_CloseTwiceIsSafe(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + l := New[int](h, 2) + + var wg sync.WaitGroup + wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) + go func() { + defer wg.Done() + errCh <- l.Run(ctx) + }() + + l.Enqueue(1) + l.Close(2) + + // Second close should not panic or deadlock. + done := make(chan struct{}) + go func() { + l.Close(3) + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + require.Fail(t, "second Close should not block") + } + + wg.Wait() + + got := h.Values() + // First close enqueues 2, second close should be ignored. + assert.Contains(t, got, 1) + assert.Contains(t, got, 2) + assert.NotContains(t, got, 3) +} + +func TestLoop_Reset(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h1 := &testHandler[int]{} + l := New[int](h1, 2) + + var wg1 sync.WaitGroup + wg1.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) + go func() { + defer wg1.Done() + errCh <- l.Run(ctx) + }() + + l.Enqueue(1) + l.Close(2) + wg1.Wait() + + assert.ElementsMatch(t, []int{1, 2}, h1.Values()) + + // Reset to a new handler and buffer size. + h2 := &testHandler[int]{} + l = l.Reset(h2, 8) + + require.NotNil(t, l) + + var wg2 sync.WaitGroup + wg2.Add(1) + errCh2 := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh2) + }) + go func() { + defer wg2.Done() + errCh2 <- l.Run(ctx) + }() + + l.Enqueue(10) + l.Enqueue(11) + l.Close(12) + wg2.Wait() + + assert.ElementsMatch(t, []int{10, 11, 12}, h2.Values()) +} + +func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + h := &testHandler[int]{} + l := New[int](h, 2) + + var wg sync.WaitGroup + wg.Add(1) + errCh := make(chan error, 1) + t.Cleanup(func() { + require.NoError(t, <-errCh) + }) + go func() { + defer wg.Done() + errCh <- l.Run(ctx) + }() + + l.Enqueue(1) + l.Close(2) + + // This enqueue should be ignored. + l.Enqueue(3) + + wg.Wait() + + got := h.Values() + assert.Equal(t, []int{1, 2}, got) +} diff --git a/events/loop/segment.go b/events/loop/segment.go new file mode 100644 index 00000000..011ea99b --- /dev/null +++ b/events/loop/segment.go @@ -0,0 +1,54 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +// queueSegment is a segment in a linked list of buffered channels. +// We always write to the tail segment. When the tail segment is full, +// we create a new tail segment and close the old one so that Run can +// move on once it has drained it. +type queueSegment[T any] struct { + ch chan T + next *queueSegment[T] +} + +// getSegment gets a queueSegment from the pool or allocates a new one. +// It always initializes a fresh channel of the configured size. +func (l *loop[T]) getSegment() *queueSegment[T] { + if l.segPool.New == nil { + l.segPool.New = func() any { + return new(queueSegment[T]) + } + } + + seg := l.segPool.Get().(*queueSegment[T]) + seg.next = nil + + segSize := l.segSize + if segSize == 0 { + segSize = 1 + } + seg.ch = make(chan T, segSize) + + return seg +} + +// putSegment returns a segment to the pool after clearing references. +func (l *loop[T]) putSegment(seg *queueSegment[T]) { + if seg == nil { + return + } + seg.ch = nil + seg.next = nil + l.segPool.Put(seg) +}