From 097a07d1bf281c5decb904db1b13c01d9de35351 Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 24 Jan 2025 18:36:17 +0200 Subject: [PATCH 1/4] Fix SimpleBatcher Concurrency Issue (#2196) ### Problem: - **Flush Operation Conflict:** - When one thread (T1) performs a flush, it may read and dispatch batched commands before resetting the `flushing` flag. - If another thread (T2) adds a command and forced flush at this moment. Command might be added to the queue but does not trigger a flush. - As a result, the command remains in the queue until the next flush request, causing a delay in dispatching. - **Flag Reset Between Iterations:** - During a default flush operation, if multiple batches are processed, the `flushing` flag is reset between iterations. - This allows another thread to take over, potentially causing the initial thread to return `BatchTasks.EMPTY` instead of properly processed commands. 1. T1 -> batch(command, CommandBatching.flush() 2. T1 -> flushing.compareAndSet(false, true) == true 3. T1 -> flush()->doFlush() 4. T2 -> batch(command, CommandBatching.flush() 5. T2 -> flushing.compareAndSet(false, true) == false #already flushing will skip doFlush and command remain not dispatched 6. T1 -> batch() completes 7. T2 -> batch() completes ### Fix: If force flush is requested while flushing, perform additional flush iteration after ongoing completes --- .../lettuce/core/dynamic/SimpleBatcher.java | 64 +++++++++++++------ .../core/dynamic/SimpleBatcherUnitTests.java | 41 ++++++++++++ 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java index a2ecf48834..693b1ae95a 100644 --- a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java +++ b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java @@ -40,6 +40,7 @@ * * @author Mark Paluch * @author Lucio Paiva + * @author Ivo Gaydajiev */ class SimpleBatcher implements Batcher { @@ -51,6 +52,11 @@ class SimpleBatcher implements Batcher { private final AtomicBoolean flushing = new AtomicBoolean(); + //foreFlushRequested indicates that a flush was requested while there is already a flush in progress + //This flag is used to ensure we will flush again after the current flush is done + //to ensure that any commands added while dispatching the current flush are also dispatched + private final AtomicBoolean forceFlushRequested = new AtomicBoolean(); + public SimpleBatcher(StatefulConnection connection, int batchSize) { LettuceAssert.isTrue(batchSize == -1 || batchSize > 1, "Batch size must be greater zero or -1"); @@ -95,37 +101,55 @@ protected BatchTasks flush(boolean forcedFlush) { List> commands = newDrainTarget(); - while (flushing.compareAndSet(false, true)) { + while (true) { + if (flushing.compareAndSet(false, true)) { + try { - try { + int consume = -1; - int consume = -1; + if (!forcedFlush) { + long queuedItems = queue.size(); + if (queuedItems >= batchSize) { + consume = batchSize; + defaultFlush = true; + } + } - if (!forcedFlush) { - long queuedItems = queue.size(); - if (queuedItems >= batchSize) { - consume = batchSize; - defaultFlush = true; + List> batch = doFlush(forcedFlush, defaultFlush, consume); + if (batch != null) { + commands.addAll(batch); } - } - List> batch = doFlush(forcedFlush, defaultFlush, consume); - if (batch != null) { - commands.addAll(batch); - } + if (defaultFlush && !queue.isEmpty() && queue.size() > batchSize) { + continue; + } + + if (forceFlushRequested.compareAndSet(true, false)) { + continue; + } - if (defaultFlush && !queue.isEmpty() && queue.size() > batchSize) { - continue; + return new BatchTasks(commands); + + } finally { + flushing.set(false); } - return new BatchTasks(commands); + } else { + // Another thread is already flushing + if (forcedFlush) { + forceFlushRequested.set(true); + } - } finally { - flushing.set(false); + if (commands.isEmpty()) { + return BatchTasks.EMPTY; + } else { + // Could happen if default flush is started in t1 and in case + // there are multiple default batches to process another thread + // acquires flushing flag in-between t1 releases flushing flag & try to acquire it again + return new BatchTasks(commands); + } } } - - return BatchTasks.EMPTY; } private List> doFlush(boolean forcedFlush, boolean defaultFlush, int consume) { diff --git a/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java b/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java index 592bfe29ba..a7c5ff90da 100644 --- a/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java +++ b/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.*; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -21,6 +22,7 @@ /** * @author Mark Paluch + * @author Ivo Gaydajiev */ @Tag(UNIT_TEST) @ExtendWith(MockitoExtension.class) @@ -127,6 +129,45 @@ void shouldBatchWithBatchControlFlush() { verify(connection).dispatch(Arrays.asList(c1, c2)); } + @Test + void shouldDispatchCommandsQueuedDuringOngoingFlush() throws InterruptedException { + RedisCommand c1 = createCommand(); + RedisCommand c2 = createCommand(); + + CountDownLatch batchFlushLatch1 = new CountDownLatch(1); + CountDownLatch batchFlushLatch2 = new CountDownLatch(1); + + when(connection.dispatch((RedisCommand) any())).thenAnswer(invocation -> { + batchFlushLatch1.countDown(); + batchFlushLatch2.await(); + + return null; + }); + + SimpleBatcher batcher = new SimpleBatcher(connection, 4); + + Thread batchThread1 = new Thread(()->{ + batcher.batch(c1, CommandBatching.flush()); + }); + batchThread1.start(); + + Thread batchThread2 = new Thread(()->{ + try { + batchFlushLatch1.await(); + } catch (InterruptedException ignored) { + } + batcher.batch(c2, CommandBatching.flush()); + batchFlushLatch2.countDown(); + }); + batchThread2.start(); + + batchThread1.join(); + batchThread2.join(); + verify(connection, times(1)).dispatch(c1); + verify(connection, times(1)).dispatch(c2); + } + + private static RedisCommand createCommand() { return new AsyncCommand<>(new Command<>(CommandType.COMMAND, null, null)); } From 1991b97993b7e785232e0a82dfcba47603fce709 Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 31 Jan 2025 16:33:19 +0200 Subject: [PATCH 2/4] format --- src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java | 8 ++++---- .../io/lettuce/core/dynamic/SimpleBatcherUnitTests.java | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java index 693b1ae95a..c2dabbf5f4 100644 --- a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java +++ b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java @@ -52,10 +52,10 @@ class SimpleBatcher implements Batcher { private final AtomicBoolean flushing = new AtomicBoolean(); - //foreFlushRequested indicates that a flush was requested while there is already a flush in progress - //This flag is used to ensure we will flush again after the current flush is done - //to ensure that any commands added while dispatching the current flush are also dispatched - private final AtomicBoolean forceFlushRequested = new AtomicBoolean(); + // foreFlushRequested indicates that a flush was requested while there is already a flush in progress + // This flag is used to ensure we will flush again after the current flush is done + // to ensure that any commands added while dispatching the current flush are also dispatched + private final AtomicBoolean forceFlushRequested = new AtomicBoolean(); public SimpleBatcher(StatefulConnection connection, int batchSize) { diff --git a/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java b/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java index a7c5ff90da..4b8861dfc5 100644 --- a/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java +++ b/src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java @@ -146,12 +146,12 @@ void shouldDispatchCommandsQueuedDuringOngoingFlush() throws InterruptedExceptio SimpleBatcher batcher = new SimpleBatcher(connection, 4); - Thread batchThread1 = new Thread(()->{ + Thread batchThread1 = new Thread(() -> { batcher.batch(c1, CommandBatching.flush()); }); batchThread1.start(); - Thread batchThread2 = new Thread(()->{ + Thread batchThread2 = new Thread(() -> { try { batchFlushLatch1.await(); } catch (InterruptedException ignored) { @@ -167,7 +167,6 @@ void shouldDispatchCommandsQueuedDuringOngoingFlush() throws InterruptedExceptio verify(connection, times(1)).dispatch(c2); } - private static RedisCommand createCommand() { return new AsyncCommand<>(new Command<>(CommandType.COMMAND, null, null)); } From dfcf7d45bf4c8e7d0e9e6269b5ddfe31a50026b6 Mon Sep 17 00:00:00 2001 From: ggivo Date: Tue, 18 Feb 2025 19:48:46 +0200 Subject: [PATCH 3/4] Update src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java Co-authored-by: Tihomir Krasimirov Mateev --- src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java index c2dabbf5f4..eafe25929a 100644 --- a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java +++ b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java @@ -52,7 +52,7 @@ class SimpleBatcher implements Batcher { private final AtomicBoolean flushing = new AtomicBoolean(); - // foreFlushRequested indicates that a flush was requested while there is already a flush in progress + // forceFlushRequested indicates that a flush was requested while there is already a flush in progress // This flag is used to ensure we will flush again after the current flush is done // to ensure that any commands added while dispatching the current flush are also dispatched private final AtomicBoolean forceFlushRequested = new AtomicBoolean(); From 92a5c29734890e97d9978c162b75c5765d0fc91f Mon Sep 17 00:00:00 2001 From: Tihomir Krasimirov Mateev Date: Tue, 18 Feb 2025 20:34:36 +0100 Subject: [PATCH 4/4] Update src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java Co-authored-by: ggivo --- src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java index eafe25929a..53e23fad6b 100644 --- a/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java +++ b/src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java @@ -143,9 +143,10 @@ protected BatchTasks flush(boolean forcedFlush) { if (commands.isEmpty()) { return BatchTasks.EMPTY; } else { - // Could happen if default flush is started in t1 and in case - // there are multiple default batches to process another thread - // acquires flushing flag in-between t1 releases flushing flag & try to acquire it again + // Scenario: A default flush is started in Thread T1. + // If multiple default batches need processing, T1 will release `flushing` and try to reacquire it. + // However, in the brief moment when T1 releases `flushing`, another thread (T2) might acquire it. + // This lead to a state where T2 has taken over processing from T1 and T1 should return commands processed return new BatchTasks(commands); } }