From 7bfbe9bc6fcdb482a9a61cd71f4879b17ea2f8ef Mon Sep 17 00:00:00 2001 From: majialong Date: Tue, 21 Oct 2025 23:26:22 +0800 Subject: [PATCH 1/3] MINOR: Fix time comparison with appendLingerMs in CoordinatorRuntime#maybeFlushCurrentBatch --- .../kafka/coordinator/common/runtime/CoordinatorRuntime.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 965f8074f8014..552e7e72effd8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -833,7 +833,7 @@ private void flushCurrentBatch() { */ private void maybeFlushCurrentBatch(long currentTimeMs) { if (currentBatch != null) { - if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { + if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { flushCurrentBatch(); } } From 90c3f59ed458f83e517eb5b82e28e40d1ce1b655 Mon Sep 17 00:00:00 2001 From: majialong Date: Fri, 24 Oct 2025 01:16:30 +0800 Subject: [PATCH 2/3] Add unit test. --- .../runtime/CoordinatorRuntimeTest.java | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index dfbbdf048bc20..89f753a75e72f 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -4779,6 +4779,7 @@ public void testCompleteTransactionEventCompletesOnlyOnce() throws Exception { assertTrue(write1.isCompletedExceptionally()); verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L); } + @Test public void testCoordinatorExecutor() { Duration writeTimeout = Duration.ofMillis(1000); @@ -4866,6 +4867,83 @@ public void testCoordinatorExecutor() { assertTrue(write1.isDone()); } + @Test + public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception { + // Provides the runtime clock; we will advance it. + MockTimer clockTimer = new MockTimer(); + // Used for scheduling timer tasks; we won't advance it to avoid a timer-triggered batch flush. + MockTimer schedulerTimer = new MockTimer(); + + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(clockTimer.time()) + .withTimer(schedulerTimer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(ACTIVE, ctx.state); + assertNull(ctx.currentBatch); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation( + "write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record1"), "response1") + ); + assertFalse(write1.isDone()); + assertNotNull(ctx.currentBatch); + assertEquals(0, writer.entries(TP).size()); + + // Verify that the linger timeout task is created; there will also be a default write timeout task. + assertEquals(2, schedulerTimer.size()); + + // Advance past the linger time. + clockTimer.advanceClock(11); + + // At this point, there are still two scheduled tasks; the linger task has not fired + // because we did not advance the schedulerTimer. + assertEquals(2, schedulerTimer.size()); + + // Write #2. + CompletableFuture write2 = runtime.scheduleWriteOperation( + "write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record2"), "response2") + ); + + // The batch should have been flushed. + assertEquals(1, writer.entries(TP).size()); + + // Because flushing the batch cancels the linger task, there should now be two write timeout tasks. + assertEquals(2, schedulerTimer.size()); + + // Verify batch contains both two records + MemoryRecords batch = writer.entries(TP).get(0); + RecordBatch recordBatch = batch.firstBatch(); + assertEquals(2, recordBatch.countOrNull()); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + // Now that all scheduled tasks have been cancelled, the scheduler queue should be empty. + assertEquals(0, schedulerTimer.size()); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp From 69963c6107690de91502e460fb9ac7007f1ec96e Mon Sep 17 00:00:00 2001 From: majialong Date: Fri, 24 Oct 2025 22:45:42 +0800 Subject: [PATCH 3/3] Adjust code style. --- .../runtime/CoordinatorRuntimeTest.java | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 89f753a75e72f..de1aa21f3f116 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -4877,20 +4877,20 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception MockPartitionWriter writer = new MockPartitionWriter(); CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() - .withTime(clockTimer.time()) - .withTimer(schedulerTimer) - .withDefaultWriteTimeOut(Duration.ofMillis(20)) - .withLoader(new MockCoordinatorLoader()) - .withEventProcessor(new DirectEventProcessor()) - .withPartitionWriter(writer) - .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) - .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) - .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) - .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) - .withExecutorService(mock(ExecutorService.class)) - .build(); + new CoordinatorRuntime.Builder() + .withTime(clockTimer.time()) + .withTimer(schedulerTimer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); @@ -4901,9 +4901,8 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception assertNull(ctx.currentBatch); // Write #1. - CompletableFuture write1 = runtime.scheduleWriteOperation( - "write#1", TP, Duration.ofMillis(20), - state -> new CoordinatorResult<>(List.of("record1"), "response1") + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record1"), "response1") ); assertFalse(write1.isDone()); assertNotNull(ctx.currentBatch); @@ -4920,9 +4919,8 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception assertEquals(2, schedulerTimer.size()); // Write #2. - CompletableFuture write2 = runtime.scheduleWriteOperation( - "write#2", TP, Duration.ofMillis(20), - state -> new CoordinatorResult<>(List.of("record2"), "response2") + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record2"), "response2") ); // The batch should have been flushed.