From ea283a9ad1d981bd73081c792fd1dbd78483d5ac Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 21 Nov 2025 17:08:25 +0000 Subject: [PATCH] events/loop: Factory & reduce lock contention Update the loop implementation to use an optimistic RLock for enqueueing, falling back to a full Lock only when the queue segment is full. This reduces lock contention. Uses a generic typed Factory to allow for the loop to be sync.Pool cached by the consumer, on demand. The loop does not cache itself as the loop may continue to be used after being closed. Signed-off-by: joshvanl --- events/loop/factory.go | 61 +++++++++++++++++++++++++++ events/loop/loop.go | 90 ++++++++++++++-------------------------- events/loop/loop_test.go | 15 ++++--- events/loop/segment.go | 12 ++---- 4 files changed, 105 insertions(+), 73 deletions(-) create mode 100644 events/loop/factory.go diff --git a/events/loop/factory.go b/events/loop/factory.go new file mode 100644 index 0000000..dcdb488 --- /dev/null +++ b/events/loop/factory.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import "sync" + +type Factory[T any] struct { + size uint64 + + segPool sync.Pool + loopPool sync.Pool +} + +func New[T any](size uint64) *Factory[T] { + f := &Factory[T]{ + size: size, + segPool: sync.Pool{ + New: func() any { + return new(queueSegment[T]) + }, + }, + } + + f.loopPool = sync.Pool{ + New: func() any { + return &loop[T]{ + factory: f, + } + }, + } + + return f +} + +func (f *Factory[T]) NewLoop(h Handler[T]) Interface[T] { + l := f.loopPool.Get().(*loop[T]) + + seg := l.getSegment() + l.head = seg + l.tail = seg + l.closeCh = make(chan struct{}) + l.handler = h + l.closed.Store(false) + + return l +} + +func (f *Factory[T]) CacheLoop(l Interface[T]) { + f.loopPool.Put(l) +} diff --git a/events/loop/loop.go b/events/loop/loop.go index cdc2e07..989ed02 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -16,6 +16,7 @@ package loop import ( "context" "sync" + "sync/atomic" ) type Handler[T any] interface { @@ -26,32 +27,21 @@ type Interface[T any] interface { Run(ctx context.Context) error Enqueue(t T) Close(t T) - Reset(h Handler[T], size uint64) Interface[T] } type loop[T any] struct { + factory *Factory[T] + head *queueSegment[T] tail *queueSegment[T] - segSize uint64 - handler Handler[T] - closed bool - closeCh chan struct{} - - lock sync.Mutex - - segPool sync.Pool -} + closed atomic.Bool -func New[T any](h Handler[T], size uint64) Interface[T] { - l := new(loop[T]) - return l.Reset(h, size) -} + closeCh chan struct{} -func Empty[T any]() Interface[T] { - return new(loop[T]) + lock sync.RWMutex } func (l *loop[T]) Run(ctx context.Context) error { @@ -78,23 +68,37 @@ func (l *loop[T]) Run(ctx context.Context) error { } func (l *loop[T]) Enqueue(req T) { - l.lock.Lock() - defer l.lock.Unlock() + l.lock.RLock() - if l.closed { + if l.closed.Load() { + l.lock.RUnlock() return } - if l.tail == nil { - seg := l.getSegment() - l.head = seg - l.tail = seg - } - // First try to send to the current tail segment without blocking. select { case l.tail.ch <- req: + l.lock.RUnlock() return + default: + l.lock.RUnlock() + } + + // Tail is full; need to acquire write lock to roll over. If no longer full + // (lost race, another goroutine rolled over first), don't expand. + + l.lock.Lock() + defer l.lock.Unlock() + + if l.closed.Load() { + // Closed while we were waiting for the lock. + return + } + + // Try again to send to the tail; if successful, another goroutine must + // have rolled over for us. + select { + case l.tail.ch <- req: default: // Tail is full: create a new segment, link it, close the old tail, and // send into the new tail. @@ -108,20 +112,13 @@ func (l *loop[T]) Enqueue(req T) { func (l *loop[T]) Close(req T) { l.lock.Lock() - if l.closed { + if l.closed.Load() { // Already closed; just unlock and wait for Run to finish. l.lock.Unlock() <-l.closeCh return } - l.closed = true - - // Ensure at least one segment exists. - if l.tail == nil { - seg := l.getSegment() - l.head = seg - l.tail = seg - } + l.closed.Store(true) // Enqueue the final request; if the tail is full, roll over as in Enqueue. select { @@ -141,28 +138,3 @@ func (l *loop[T]) Close(req T) { // Wait for Run to finish draining everything. <-l.closeCh } - -func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] { - if l == nil { - return New[T](h, size) - } - - l.lock.Lock() - defer l.lock.Unlock() - - l.closed = false - l.closeCh = make(chan struct{}) - l.handler = h - l.segSize = size - - // Initialize pool for this instantiation of T. - l.segPool.New = func() any { - return new(queueSegment[T]) - } - - seg := l.getSegment() - l.head = seg - l.tail = seg - - return l -} diff --git a/events/loop/loop_test.go b/events/loop/loop_test.go index ade3f33..c911d87 100644 --- a/events/loop/loop_test.go +++ b/events/loop/loop_test.go @@ -61,7 +61,8 @@ func TestLoop_EnqueueAndRunOrder_Unbounded(t *testing.T) { h := &testHandler[int]{} const segmentSize = 4 - l := New[int](h, segmentSize) + f := New[int](segmentSize) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) @@ -103,7 +104,8 @@ func TestLoop_CloseTwiceIsSafe(t *testing.T) { defer cancel() h := &testHandler[int]{} - l := New[int](h, 2) + f := New[int](2) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) @@ -146,7 +148,8 @@ func TestLoop_Reset(t *testing.T) { defer cancel() h1 := &testHandler[int]{} - l := New[int](h1, 2) + f := New[int](2) + l := f.NewLoop(h1) var wg1 sync.WaitGroup wg1.Add(1) @@ -167,7 +170,8 @@ func TestLoop_Reset(t *testing.T) { // Reset to a new handler and buffer size. h2 := &testHandler[int]{} - l = l.Reset(h2, 8) + f = New[int](8) + l = f.NewLoop(h2) require.NotNil(t, l) @@ -195,7 +199,8 @@ func TestLoop_EnqueueAfterCloseIsDropped(t *testing.T) { defer cancel() h := &testHandler[int]{} - l := New[int](h, 2) + f := New[int](2) + l := f.NewLoop(h) var wg sync.WaitGroup wg.Add(1) diff --git a/events/loop/segment.go b/events/loop/segment.go index 011ea99..4b03525 100644 --- a/events/loop/segment.go +++ b/events/loop/segment.go @@ -25,16 +25,10 @@ type queueSegment[T any] struct { // getSegment gets a queueSegment from the pool or allocates a new one. // It always initializes a fresh channel of the configured size. func (l *loop[T]) getSegment() *queueSegment[T] { - if l.segPool.New == nil { - l.segPool.New = func() any { - return new(queueSegment[T]) - } - } - - seg := l.segPool.Get().(*queueSegment[T]) + seg := l.factory.segPool.Get().(*queueSegment[T]) seg.next = nil - segSize := l.segSize + segSize := l.factory.size if segSize == 0 { segSize = 1 } @@ -50,5 +44,5 @@ func (l *loop[T]) putSegment(seg *queueSegment[T]) { } seg.ch = nil seg.next = nil - l.segPool.Put(seg) + l.factory.segPool.Put(seg) }