Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 81 additions & 24 deletions events/loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,24 @@ type Interface[T any] interface {
}

type loop[T any] struct {
queue chan T
head *queueSegment[T]
tail *queueSegment[T]

segSize uint64

handler Handler[T]

closed bool
closeCh chan struct{}
lock sync.RWMutex

lock sync.Mutex

segPool sync.Pool
}

func New[T any](h Handler[T], size uint64) Interface[T] {
return &loop[T]{
queue: make(chan T, size),
handler: h,
closeCh: make(chan struct{}),
}
l := new(loop[T])
return l.Reset(h, size)
}

func Empty[T any]() Interface[T] {
Expand All @@ -53,41 +57,88 @@ func Empty[T any]() Interface[T] {
func (l *loop[T]) Run(ctx context.Context) error {
defer close(l.closeCh)

for {
req, ok := <-l.queue
if !ok {
return nil
current := l.head
for current != nil {
// Drain this segment in order. The channel will be closed either:
// - when we "roll over" to a new segment, or
// - when Close() is called for the final segment.
for req := range current.ch {
if err := l.handler.Handle(ctx, req); err != nil {
return err
}
}

if err := l.handler.Handle(ctx, req); err != nil {
return err
}
// Move to the next segment, and return this one to the pool.
next := current.next
l.putSegment(current)
current = next
}

return nil
}

func (l *loop[T]) Enqueue(req T) {
l.lock.RLock()
defer l.lock.RUnlock()
l.lock.Lock()
defer l.lock.Unlock()

if l.closed {
return
}

if l.tail == nil {
seg := l.getSegment()
l.head = seg
l.tail = seg
}

// First try to send to the current tail segment without blocking.
select {
case l.queue <- req:
case <-l.closeCh:
case l.tail.ch <- req:
return
default:
// Tail is full: create a new segment, link it, close the old tail, and
// send into the new tail.
newSeg := l.getSegment()
l.tail.next = newSeg
close(l.tail.ch)
l.tail = newSeg
l.tail.ch <- req
}
}

func (l *loop[T]) Close(req T) {
l.lock.Lock()
if l.closed {
// Already closed; just unlock and wait for Run to finish.
l.lock.Unlock()
<-l.closeCh
return
}
l.closed = true

// Ensure at least one segment exists.
if l.tail == nil {
seg := l.getSegment()
l.head = seg
l.tail = seg
}

// Enqueue the final request; if the tail is full, roll over as in Enqueue.
select {
case l.queue <- req:
case <-l.closeCh:
case l.tail.ch <- req:
default:
newSeg := l.getSegment()
l.tail.next = newSeg
close(l.tail.ch)
l.tail = newSeg
l.tail.ch <- req
}
close(l.queue)

// No more items will be enqueued; close the tail to signal completion.
close(l.tail.ch)
l.lock.Unlock()

// Wait for Run to finish draining everything.
<-l.closeCh
}

Expand All @@ -102,10 +153,16 @@ func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] {
l.closed = false
l.closeCh = make(chan struct{})
l.handler = h
l.segSize = size

// Initialize pool for this instantiation of T.
l.segPool.New = func() any {
return new(queueSegment[T])
}

// TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and
// improve performance.
l.queue = make(chan T, size)
seg := l.getSegment()
l.head = seg
l.tail = seg

return l
}
221 changes: 221 additions & 0 deletions events/loop/loop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loop

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// testHandler is a simple handler that records all values it sees.
type testHandler[T any] struct {
mu sync.Mutex
seen []T
err error
delay time.Duration
}

func (h *testHandler[T]) Handle(ctx context.Context, v T) error {
if h.delay > 0 {
select {
case <-time.After(h.delay):
case <-ctx.Done():
return ctx.Err()
}
}

h.mu.Lock()
defer h.mu.Unlock()
h.seen = append(h.seen, v)
return h.err
}

func (h *testHandler[T]) Values() []T {
h.mu.Lock()
defer h.mu.Unlock()
out := make([]T, len(h.seen))
copy(out, h.seen)
return out
}

func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

h := &testHandler[int]{}
const segmentSize = 4

l := New[int](h, segmentSize)

var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
t.Cleanup(func() {
require.NoError(t, <-errCh)
})
go func() {
defer wg.Done()
errCh <- l.Run(ctx)
}()

// Enqueue more items than a single segment to force multiple channels.
const n = 25
for i := range n {
l.Enqueue(i)
}

// Close with a sentinel value so we can verify it is the last element.
const final = 999
l.Close(final)

wg.Wait()

got := h.Values()
require.Len(t, got, n+1, "handler should see all enqueued items plus final close item")

// First n values should be 0..n-1 in order.
for i := range n {
assert.Equal(t, i, got[i], "item at index %d out of order", i)
}

// Last one is the final close value.
assert.Equal(t, final, got[len(got)-1], "last item should be the Close() value")
}

func TestLoop_CloseTwiceIsSafe(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

h := &testHandler[int]{}
l := New[int](h, 2)

var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
t.Cleanup(func() {
require.NoError(t, <-errCh)
})
go func() {
defer wg.Done()
errCh <- l.Run(ctx)
}()

l.Enqueue(1)
l.Close(2)

// Second close should not panic or deadlock.
done := make(chan struct{})
go func() {
l.Close(3)
close(done)
}()

select {
case <-done:
case <-time.After(time.Second):
require.Fail(t, "second Close should not block")
}

wg.Wait()

got := h.Values()
// First close enqueues 2, second close should be ignored.
assert.Contains(t, got, 1)
assert.Contains(t, got, 2)
assert.NotContains(t, got, 3)
}

func TestLoop_Reset(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

h1 := &testHandler[int]{}
l := New[int](h1, 2)

var wg1 sync.WaitGroup
wg1.Add(1)
errCh := make(chan error, 1)
t.Cleanup(func() {
require.NoError(t, <-errCh)
})
go func() {
defer wg1.Done()
errCh <- l.Run(ctx)
}()

l.Enqueue(1)
l.Close(2)
wg1.Wait()

assert.ElementsMatch(t, []int{1, 2}, h1.Values())

// Reset to a new handler and buffer size.
h2 := &testHandler[int]{}
l = l.Reset(h2, 8)

require.NotNil(t, l)

var wg2 sync.WaitGroup
wg2.Add(1)
errCh2 := make(chan error, 1)
t.Cleanup(func() {
require.NoError(t, <-errCh2)
})
go func() {
defer wg2.Done()
errCh2 <- l.Run(ctx)
}()

l.Enqueue(10)
l.Enqueue(11)
l.Close(12)
wg2.Wait()

assert.ElementsMatch(t, []int{10, 11, 12}, h2.Values())
}

func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

h := &testHandler[int]{}
l := New[int](h, 2)

var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
t.Cleanup(func() {
require.NoError(t, <-errCh)
})
go func() {
defer wg.Done()
errCh <- l.Run(ctx)
}()

l.Enqueue(1)
l.Close(2)

// This enqueue should be ignored.
l.Enqueue(3)

wg.Wait()

got := h.Values()
assert.Equal(t, []int{1, 2}, got)
}
Loading