diff --git a/events/loop/factory.go b/events/loop/factory.go new file mode 100644 index 0000000..dcdb488 --- /dev/null +++ b/events/loop/factory.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import "sync" + +type Factory[T any] struct { + size uint64 + + segPool sync.Pool + loopPool sync.Pool +} + +func New[T any](size uint64) *Factory[T] { + f := &Factory[T]{ + size: size, + segPool: sync.Pool{ + New: func() any { + return new(queueSegment[T]) + }, + }, + } + + f.loopPool = sync.Pool{ + New: func() any { + return &loop[T]{ + factory: f, + } + }, + } + + return f +} + +func (f *Factory[T]) NewLoop(h Handler[T]) Interface[T] { + l := f.loopPool.Get().(*loop[T]) + + seg := l.getSegment() + l.head = seg + l.tail = seg + l.closeCh = make(chan struct{}) + l.handler = h + l.closed.Store(false) + + return l +} + +func (f *Factory[T]) CacheLoop(l Interface[T]) { + f.loopPool.Put(l) +} diff --git a/events/loop/loop.go b/events/loop/loop.go index cdc2e07..989ed02 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -16,6 +16,7 @@ package loop import ( "context" "sync" + "sync/atomic" ) type Handler[T any] interface { @@ -26,32 +27,21 @@ type Interface[T any] interface { Run(ctx context.Context) error Enqueue(t T) Close(t T) - Reset(h Handler[T], size uint64) Interface[T] } type loop[T any] struct { + factory *Factory[T] + head *queueSegment[T] tail *queueSegment[T] - segSize uint64 - handler Handler[T] - closed bool - closeCh chan struct{} - - lock sync.Mutex - - segPool sync.Pool -} + closed atomic.Bool -func New[T any](h Handler[T], size uint64) Interface[T] { - l := new(loop[T]) - return l.Reset(h, size) -} + closeCh chan struct{} -func Empty[T any]() Interface[T] { - return new(loop[T]) + lock sync.RWMutex } func (l *loop[T]) Run(ctx context.Context) error { @@ -78,23 +68,37 @@ func (l *loop[T]) Run(ctx context.Context) error { } func (l *loop[T]) Enqueue(req T) { - l.lock.Lock() - defer l.lock.Unlock() + l.lock.RLock() - if l.closed { + if l.closed.Load() { + l.lock.RUnlock() return } - if l.tail == nil { - seg := l.getSegment() - l.head = seg - l.tail = seg - } - // First try to send to the current tail segment without blocking. select { case l.tail.ch <- req: + l.lock.RUnlock() return + default: + l.lock.RUnlock() + } + + // Tail is full; need to acquire write lock to roll over. If no longer full + // (lost race, another goroutine rolled over first), don't expand. + + l.lock.Lock() + defer l.lock.Unlock() + + if l.closed.Load() { + // Closed while we were waiting for the lock. + return + } + + // Try again to send to the tail; if successful, another goroutine must + // have rolled over for us. + select { + case l.tail.ch <- req: default: // Tail is full: create a new segment, link it, close the old tail, and // send into the new tail. @@ -108,20 +112,13 @@ func (l *loop[T]) Enqueue(req T) { func (l *loop[T]) Close(req T) { l.lock.Lock() - if l.closed { + if l.closed.Load() { // Already closed; just unlock and wait for Run to finish. l.lock.Unlock() <-l.closeCh return } - l.closed = true - - // Ensure at least one segment exists. - if l.tail == nil { - seg := l.getSegment() - l.head = seg - l.tail = seg - } + l.closed.Store(true) // Enqueue the final request; if the tail is full, roll over as in Enqueue. select { @@ -141,28 +138,3 @@ func (l *loop[T]) Close(req T) { // Wait for Run to finish draining everything. <-l.closeCh } - -func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] { - if l == nil { - return New[T](h, size) - } - - l.lock.Lock() - defer l.lock.Unlock() - - l.closed = false - l.closeCh = make(chan struct{}) - l.handler = h - l.segSize = size - - // Initialize pool for this instantiation of T. - l.segPool.New = func() any { - return new(queueSegment[T]) - } - - seg := l.getSegment() - l.head = seg - l.tail = seg - - return l -} diff --git a/events/loop/loop_test.go b/events/loop/loop_test.go index ade3f33..c911d87 100644 --- a/events/loop/loop_test.go +++ b/events/loop/loop_test.go @@ -61,7 +61,8 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { h := &testHandler[int]{} const segmentSize = 4 - l := New[int](h, segmentSize) + f := New[int](segmentSize) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) @@ -103,7 +104,8 @@ func TestLoop_CloseTwiceIsSafe(t *testing.T) { defer cancel() h := &testHandler[int]{} - l := New[int](h, 2) + f := New[int](2) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) @@ -146,7 +148,8 @@ func TestLoop_Reset(t *testing.T) { defer cancel() h1 := &testHandler[int]{} - l := New[int](h1, 2) + f := New[int](2) + l := f.NewLoop(h1) var wg1 sync.WaitGroup wg1.Add(1) @@ -167,7 +170,8 @@ func TestLoop_Reset(t *testing.T) { // Reset to a new handler and buffer size. h2 := &testHandler[int]{} - l = l.Reset(h2, 8) + f = New[int](8) + l = f.NewLoop(h2) require.NotNil(t, l) @@ -195,7 +199,8 @@ func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) { defer cancel() h := &testHandler[int]{} - l := New[int](h, 2) + f := New[int](2) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) diff --git a/events/loop/segment.go b/events/loop/segment.go index 011ea99..4b03525 100644 --- a/events/loop/segment.go +++ b/events/loop/segment.go @@ -25,16 +25,10 @@ type queueSegment[T any] struct { // getSegment gets a queueSegment from the pool or allocates a new one. // It always initializes a fresh channel of the configured size. func (l *loop[T]) getSegment() *queueSegment[T] { - if l.segPool.New == nil { - l.segPool.New = func() any { - return new(queueSegment[T]) - } - } - - seg := l.segPool.Get().(*queueSegment[T]) + seg := l.factory.segPool.Get().(*queueSegment[T]) seg.next = nil - segSize := l.segSize + segSize := l.factory.size if segSize == 0 { segSize = 1 } @@ -50,5 +44,5 @@ func (l *loop[T]) putSegment(seg *queueSegment[T]) { } seg.ch = nil seg.next = nil - l.segPool.Put(seg) + l.factory.segPool.Put(seg) }