From 78121abdcab71f0a12b4a5874d1181577e7ad744 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 31 Jul 2025 15:07:22 -0700 Subject: [PATCH 01/20] add helper functions and result for tests --- .../internals/RecordAccumulatorTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 750440d2595a5..b87284747f395 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1597,6 +1597,55 @@ private static class BatchDrainedResult { } } + private static class SplitAndReenqueueResult { + final int numSplitBatches; + final int originalRecordCount; + final int originalBatchSize; + final ProducerBatch splitBatch; + + SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) { + this.numSplitBatches = numSplitBatches; + this.originalRecordCount = originalRecordCount; + this.originalBatchSize = originalBatchSize; + this.splitBatch = splitBatch; + } + } + + private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) { + long now = time.milliseconds(); + + // Enqueue the batch for processing + accum.reenqueue(batch, now); + time.sleep(121L); // Wait for retry backoff + + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); + assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt); + + Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt); + assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt); + + ProducerBatch drainedBatch = drained.get(node1.id()).get(0); + assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt); + + int originalRecordCount = drainedBatch.recordCount; + int originalBatchSize = drainedBatch.estimatedSizeInBytes(); + + int numSplitBatches = accum.splitAndReenqueue(drainedBatch); + + // wait for split batch to become ready + time.sleep(101L); + result = accum.ready(metadataCache, time.milliseconds()); + drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt); + assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt); + + ProducerBatch splitBatch = drained.get(node1.id()).get(0); + assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); + + return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch); + } + /** * Return the offset delta. */ From 9f97d8ebc1db11b67683d507009c6bf2ddc3597d Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 31 Jul 2025 15:08:17 -0700 Subject: [PATCH 02/20] test: added 3 passing tests --- .../internals/RecordAccumulatorTest.java | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b87284747f395..c04dd59e09ea1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1066,6 +1066,146 @@ public void testSplitAndReenqueue() throws ExecutionException, InterruptedExcept assertEquals(1, future2.get().offset()); } + // This test confirms us that splitting a single large record + // creates an unsplittable batch (does not really split it) + // that will continue to fail with MESSAGE_TOO_LARGE, + // causing infinite retry loops + @Test + public void testSplitAndReenqueueWithSingleLargeRecord() throws ExecutionException, InterruptedException { + long now = time.milliseconds(); + int smallBatchSize = 1024; + RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); + + // create a single record that is much larger than the batch size limit + // we are trying to mimic by send a record larger than broker's message.max.bytes + byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB + + // Create a buffer with enough space for the large record + ByteBuffer buffer = ByteBuffer.allocate(8192); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch batch = new ProducerBatch(tp1, builder, now, true); + + final AtomicInteger acked = new AtomicInteger(0); + Callback cb = (metadata, exception) -> acked.incrementAndGet(); + + // create a large batch but only with one single record + Future future = batch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); + assertNotNull(future, "Should be able to append the large record to batch"); + assertEquals(1, batch.recordCount, "Batch should contain exactly one record"); + batch.close(); + + // try to split and reenqueue a single large record + SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, batch, 0); + + // The below asserts tests that the single large record + // results in exactly one "split" batch + assertEquals(1, result.numSplitBatches, "Single large record should result in exactly one split batch"); + assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record"); + assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record"); + assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed batch size limit"); + + // the "split" batch is still oversized and contains the same record + assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, + "Split batch is still oversized - it cannot be split further and will cause an error, will retry infinitely"); + } + + // This test retries for infinite times (controlled for 5 times for testing) + // because the record can never be split further + @Test + public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord() throws ExecutionException, InterruptedException { + long now = time.milliseconds(); + int smallBatchSize = 1024; + RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); + + // create a single record that is much larger than the batch size limit + // we are trying to mimic by send a record larger than broker's message.max.bytes + byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB + + // Create a buffer with enough space for the large record + ByteBuffer buffer = ByteBuffer.allocate(8192); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true); + + final AtomicInteger acked = new AtomicInteger(0); + Callback cb = (metadata, exception) -> acked.incrementAndGet(); + + Future future = originalBatch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); + assertNotNull(future, "Should be able to append the large record to batch"); + assertEquals(1, originalBatch.recordCount, "Original batch should contain exactly one record"); + originalBatch.close(); + + // controlled test case, retry behavior across multiple cycles + // 5 cycles for testing but mimics infinite retries in reality + final int maxRetryCycles = 5; + + ProducerBatch currentBatch = originalBatch; + List results = new ArrayList<>(); + + for (int retryAttempt = 0; retryAttempt < maxRetryCycles; retryAttempt++) { + SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt); + results.add(result); + + // Verify that each retry produces exactly 1 "split" batch (cannot be split further) + assertEquals(1, result.numSplitBatches, "Single record should result in exactly one split batch in retry attempt " + retryAttempt); + assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record in retry attempt " + retryAttempt); + assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed size limit in retry attempt " + retryAttempt); + assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); + + // The split batch is still oversized and will fail with MESSAGE_TOO_LARGE again + assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, + "Split batch in retry " + retryAttempt + " is still oversized and will fail MESSAGE_TOO_LARGE again"); + + // the new batch must be the split batch + currentBatch = result.splitBatch; + } + + // making sure that all the retry attempts were tracked + assertEquals(maxRetryCycles, results.size(), "Should have tracked all retry attempts"); + + // consistency across all retry cycles - each produces exactly 1 unsplittable batch + for (int i = 0; i < maxRetryCycles; i++) { + SplitAndReenqueueResult result = results.get(i); + assertEquals(1, result.numSplitBatches, "Retry attempt " + i + " should produce exactly 1 split batch"); + assertEquals(1, result.originalRecordCount, "Retry attempt " + i + " should have exactly 1 record"); + assertTrue(result.originalBatchSize > smallBatchSize, "Retry attempt " + i + " batch should exceed size limit"); + } + } + + // here I am testing the hasRoomFor() behaviour + // It allows the first record no matter the size + // but does not allow the second record + @Test + public void testHasRoomForAllowsOversizedFirstRecordButRejectsSubsequentRecords() { + long now = time.milliseconds(); + int smallBatchSize = 1024; + + // Create a large record that exceeds batch size limit + byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB + + // Create a small buffer that cannot fit the large record + ByteBuffer buffer = ByteBuffer.allocate(smallBatchSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + + // testing existing code: + // hasRoomFor() should return true for first record regardless of size + boolean hasRoomForFirst = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertTrue(hasRoomForFirst, "hasRoomFor() should return true for first record regardless of size when numRecords == 0"); + + // append the first oversized record - should succeed + builder.append(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertEquals(1, builder.numRecords(), "Should have successfully appended the first oversized record"); + + // now append another large record when numRecords > 0 + boolean hasRoomForSecond = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertFalse(hasRoomForSecond, "hasRoomFor() should return false for oversized record when numRecords > 0"); + + // Now append with a smaller record that would normally fit but + // this too should be rejected due to limited buffer space + byte[] smallValue = new byte[100]; // Small record + boolean hasRoomForSmall = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(smallValue), Record.EMPTY_HEADERS); + assertFalse(hasRoomForSmall, "hasRoomFor() should return false for any record when buffer is full from oversized first record"); + } + @Test public void testSplitBatchOffAccumulator() throws InterruptedException { long seed = System.currentTimeMillis(); From a0c23545fbee5f573e706c6026033708bc7e25ac Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Fri, 3 Oct 2025 16:11:55 -0700 Subject: [PATCH 03/20] patch to check record level futures --- .../producer/internals/ProducerBatch.java | 15 + .../producer/internals/RecordAccumulator.java | 26 +- ...nceDeliveryMessageLossIntegrationTest.java | 273 ++++++++++++++++++ 3 files changed, 308 insertions(+), 6 deletions(-) create mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 5619819dde72e..bc6d8e96be9d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -237,6 +238,20 @@ public boolean completeExceptionally( return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions); } + /** + * Get all record futures for this batch. + * This is used by flush() to wait on individual records rather than the batch-level future. + * When batches are split, individual futures are chained to the new batches, + * ensuring flush() waits for all split batches to complete. + * + * @return List of FutureRecordMetadata for all records in this batch + */ + public List recordFutures() { + return thunks.stream() + .map(thunk -> thunk.future) + .collect(Collectors.toList()); + } + /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index f0c2719db9612..b300484b0c57a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.producer.Callback; @@ -1072,12 +1073,25 @@ private boolean appendsInProgress() { */ public void awaitFlushCompletion() throws InterruptedException { try { - // Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush. - // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage - // collection can occur on the contents. - // The sender will remove ProducerBatch(s) from the original incomplete collection. - for (ProduceRequestResult result : this.incomplete.requestResults()) - result.await(); + // Obtain a snapshot of all record futures at the time of the flush. + // We wait on individual record futures rather than batch-level futures because + // by waiting on record futures, we ensure flush() blocks until all split + // batches complete. + // + // We first collect all futures into a list first to avoid holding references to + // ProducerBatch objects, allowing them to be garbage collected after completion. + List futures = new ArrayList<>(); + for (ProducerBatch batch : this.incomplete.copyAll()) { + futures.addAll(batch.recordFutures()); + } + + for (FutureRecordMetadata future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + log.trace("Completed future with exception during flush", e); + } + } } finally { this.flushesInProgress.decrementAndGet(); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java new file mode 100644 index 0000000000000..7b6470b41f92c --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.consumerConfig; +import static org.apache.kafka.test.TestUtils.producerConfig; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(600) +@Tag("integration") +public class AtLeastOnceDeliveryMessageLossIntegrationTest { + private static final Logger log = LoggerFactory.getLogger( + AtLeastOnceDeliveryMessageLossIntegrationTest.class); + + private static final int NUM_BROKERS = 1; + private static final int LARGE_RECORD_COUNT = 50000; + private static final int SMALL_RECORD_COUNT = 40000; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @BeforeAll + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + + private String applicationId; + private String inputTopic; + private String outputTopic; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + + @BeforeEach + public void setUp(final TestInfo testInfo) throws Exception { + final String testId = safeUniqueTestName(testInfo); + applicationId = "app-" + testId; + inputTopic = "input-" + testId; + outputTopic = "output-" + testId; + + cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic); + CLUSTER.createTopics(inputTopic, outputTopic); + + setupStreamsConfiguration(); + } + + @AfterEach + public void cleanUp() throws Exception { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + if (streamsConfiguration != null) { + purgeLocalStreamsState(streamsConfiguration); + } + } + + // failing test + @Test + public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) { + produceInputData(LARGE_RECORD_COUNT); + + kafkaStreams = createStreamsApplication(); + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + // for this bug + // first offsets are committed, then + // no messages produced in output topic, then + // repeated retries and MESSAGE_TOO_LARGE error + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")), + "Should log MESSAGE_TOO_LARGE and splitting retry messages"); + + final int outputRecordCount = verifyOutputRecords(0); // should not produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced + + assertEquals(0, outputRecordCount, "Output topic should not have any records"); + assertTrue(offsetsCommitted, "Consumer offsets should not be committed"); + } + } + + @Test + public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception { + produceInputData(SMALL_RECORD_COUNT); + + try (final KafkaStreams kafkaStreams = createStreamsApplication()) { + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + //normal behavior + final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets + + assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); + assertTrue(offsetsCommitted, "Consumer offsets should be committed"); + } + } + + + private void setupStreamsConfiguration() { + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // AT_LEAST_ONCE processing guarantee + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L); + + // Producer configuration that can trigger MESSAGE_TOO_LARGE errors + streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000); + streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432); + } + + private void produceInputData(final int recordCount) throws Exception { + final List> inputRecords = new ArrayList<>(); + for (int i = 1; i <= recordCount; i++) { + inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i)); + } + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputRecords, + producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), + CLUSTER.time + ); + } + + private void waitForProcessingAndCommit() throws Exception { + // Wait slightly longer than commit interval to ensure processing and offset commits + waitForCondition( + () -> { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + return adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .containsKey(topicPartition); + } catch (final Exception e) { + return false; + } + }, + 35000L, + "Waiting for consumer offsets to be committed" + ); + } + + private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + + final long committedOffset = adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .get(topicPartition) + .offset(); + + log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset); + return committedOffset == expectedOffset; + } + } + + private int verifyOutputRecords(final int expectedRecordCount) { + try { + final List> outputRecords = + waitUntilMinKeyValueRecordsReceived( + consumerConfig( + CLUSTER.bootstrapServers(), + applicationId + "-test-consumer-" + System.currentTimeMillis(), + StringDeserializer.class, + StringDeserializer.class + ), + outputTopic, + expectedRecordCount, + 30000L + ); + log.info("Output topic {} contains {} records", outputTopic, outputRecords.size()); + return outputRecords.size(); + } catch (final Exception e) { + log.info("Exception while reading output records: {}", e.getMessage()); + return 0; + } + } + + private KafkaStreams createStreamsApplication() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream input = builder.stream(inputTopic); + input.peek((key, value) -> { + if (Integer.parseInt(key) % 1000 == 0) { + log.debug("Processing record {}: {} -> {}", key, key, value); + } + }).to(outputTopic); + + return new KafkaStreams(builder.build(), streamsConfiguration); + } +} \ No newline at end of file From 0b3258ac01dc3e9533607d4a14e3116c5261992f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 6 Oct 2025 18:39:16 -0700 Subject: [PATCH 04/20] remove exception --- .../AtLeastOnceDeliveryMessageLossIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index 7b6470b41f92c..ff5e0634d8195 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -183,7 +183,7 @@ private void setupStreamsConfiguration() { streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432); } - private void produceInputData(final int recordCount) throws Exception { + private void produceInputData(final int recordCount) { final List> inputRecords = new ArrayList<>(); for (int i = 1; i <= recordCount; i++) { inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i)); From 79ffb7b5bc7c2609b651a5666e7150260982d4fe Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 6 Oct 2025 18:41:38 -0700 Subject: [PATCH 05/20] fix typos --- .../kafka/clients/producer/internals/ProducerBatch.java | 4 ++-- .../kafka/clients/producer/internals/RecordAccumulator.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index bc6d8e96be9d3..121c6154de170 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -241,8 +241,8 @@ public boolean completeExceptionally( /** * Get all record futures for this batch. * This is used by flush() to wait on individual records rather than the batch-level future. - * When batches are split, individual futures are chained to the new batches, - * ensuring flush() waits for all split batches to complete. + * When batches are split, individual record futures are chained to the new batches, + * ensuring that flush() waits for all split batches to complete. * * @return List of FutureRecordMetadata for all records in this batch */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b300484b0c57a..25801b0d75d0a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -1078,7 +1078,7 @@ public void awaitFlushCompletion() throws InterruptedException { // by waiting on record futures, we ensure flush() blocks until all split // batches complete. // - // We first collect all futures into a list first to avoid holding references to + // We first collect all futures into a list to avoid holding references to // ProducerBatch objects, allowing them to be garbage collected after completion. List futures = new ArrayList<>(); for (ProducerBatch batch : this.incomplete.copyAll()) { From 6428608d5f616a243cc01b7b4c866270268e4e7b Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 6 Oct 2025 18:43:22 -0700 Subject: [PATCH 06/20] remove useless tests --- .../internals/RecordAccumulatorTest.java | 154 ------------------ 1 file changed, 154 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index c04dd59e09ea1..1c6b8e6fef48e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1066,111 +1066,6 @@ public void testSplitAndReenqueue() throws ExecutionException, InterruptedExcept assertEquals(1, future2.get().offset()); } - // This test confirms us that splitting a single large record - // creates an unsplittable batch (does not really split it) - // that will continue to fail with MESSAGE_TOO_LARGE, - // causing infinite retry loops - @Test - public void testSplitAndReenqueueWithSingleLargeRecord() throws ExecutionException, InterruptedException { - long now = time.milliseconds(); - int smallBatchSize = 1024; - RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); - - // create a single record that is much larger than the batch size limit - // we are trying to mimic by send a record larger than broker's message.max.bytes - byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB - - // Create a buffer with enough space for the large record - ByteBuffer buffer = ByteBuffer.allocate(8192); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); - ProducerBatch batch = new ProducerBatch(tp1, builder, now, true); - - final AtomicInteger acked = new AtomicInteger(0); - Callback cb = (metadata, exception) -> acked.incrementAndGet(); - - // create a large batch but only with one single record - Future future = batch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); - assertNotNull(future, "Should be able to append the large record to batch"); - assertEquals(1, batch.recordCount, "Batch should contain exactly one record"); - batch.close(); - - // try to split and reenqueue a single large record - SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, batch, 0); - - // The below asserts tests that the single large record - // results in exactly one "split" batch - assertEquals(1, result.numSplitBatches, "Single large record should result in exactly one split batch"); - assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record"); - assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record"); - assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed batch size limit"); - - // the "split" batch is still oversized and contains the same record - assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, - "Split batch is still oversized - it cannot be split further and will cause an error, will retry infinitely"); - } - - // This test retries for infinite times (controlled for 5 times for testing) - // because the record can never be split further - @Test - public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord() throws ExecutionException, InterruptedException { - long now = time.milliseconds(); - int smallBatchSize = 1024; - RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); - - // create a single record that is much larger than the batch size limit - // we are trying to mimic by send a record larger than broker's message.max.bytes - byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB - - // Create a buffer with enough space for the large record - ByteBuffer buffer = ByteBuffer.allocate(8192); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); - ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true); - - final AtomicInteger acked = new AtomicInteger(0); - Callback cb = (metadata, exception) -> acked.incrementAndGet(); - - Future future = originalBatch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); - assertNotNull(future, "Should be able to append the large record to batch"); - assertEquals(1, originalBatch.recordCount, "Original batch should contain exactly one record"); - originalBatch.close(); - - // controlled test case, retry behavior across multiple cycles - // 5 cycles for testing but mimics infinite retries in reality - final int maxRetryCycles = 5; - - ProducerBatch currentBatch = originalBatch; - List results = new ArrayList<>(); - - for (int retryAttempt = 0; retryAttempt < maxRetryCycles; retryAttempt++) { - SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt); - results.add(result); - - // Verify that each retry produces exactly 1 "split" batch (cannot be split further) - assertEquals(1, result.numSplitBatches, "Single record should result in exactly one split batch in retry attempt " + retryAttempt); - assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record in retry attempt " + retryAttempt); - assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed size limit in retry attempt " + retryAttempt); - assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); - - // The split batch is still oversized and will fail with MESSAGE_TOO_LARGE again - assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, - "Split batch in retry " + retryAttempt + " is still oversized and will fail MESSAGE_TOO_LARGE again"); - - // the new batch must be the split batch - currentBatch = result.splitBatch; - } - - // making sure that all the retry attempts were tracked - assertEquals(maxRetryCycles, results.size(), "Should have tracked all retry attempts"); - - // consistency across all retry cycles - each produces exactly 1 unsplittable batch - for (int i = 0; i < maxRetryCycles; i++) { - SplitAndReenqueueResult result = results.get(i); - assertEquals(1, result.numSplitBatches, "Retry attempt " + i + " should produce exactly 1 split batch"); - assertEquals(1, result.originalRecordCount, "Retry attempt " + i + " should have exactly 1 record"); - assertTrue(result.originalBatchSize > smallBatchSize, "Retry attempt " + i + " batch should exceed size limit"); - } - } - // here I am testing the hasRoomFor() behaviour // It allows the first record no matter the size // but does not allow the second record @@ -1737,55 +1632,6 @@ private static class BatchDrainedResult { } } - private static class SplitAndReenqueueResult { - final int numSplitBatches; - final int originalRecordCount; - final int originalBatchSize; - final ProducerBatch splitBatch; - - SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) { - this.numSplitBatches = numSplitBatches; - this.originalRecordCount = originalRecordCount; - this.originalBatchSize = originalBatchSize; - this.splitBatch = splitBatch; - } - } - - private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) { - long now = time.milliseconds(); - - // Enqueue the batch for processing - accum.reenqueue(batch, now); - time.sleep(121L); // Wait for retry backoff - - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); - assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt); - - Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt); - assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt); - - ProducerBatch drainedBatch = drained.get(node1.id()).get(0); - assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt); - - int originalRecordCount = drainedBatch.recordCount; - int originalBatchSize = drainedBatch.estimatedSizeInBytes(); - - int numSplitBatches = accum.splitAndReenqueue(drainedBatch); - - // wait for split batch to become ready - time.sleep(101L); - result = accum.ready(metadataCache, time.milliseconds()); - drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt); - assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt); - - ProducerBatch splitBatch = drained.get(node1.id()).get(0); - assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); - - return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch); - } - /** * Return the offset delta. */ From 3fd455340e8d5202fec7f4b0c49403f4768d3947 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 6 Oct 2025 19:06:10 -0700 Subject: [PATCH 07/20] add perf test --- .../internals/RecordAccumulatorTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 1c6b8e6fef48e..e9a0cef29d8ba 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1825,4 +1825,56 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx // Verify all original records are accounted for (no data loss) assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting"); } + + @Test + public void testFlushPerformanceWithManyRecords() throws Exception { + int numRecords = 5000; + int batchSize = 1024; + long totalSize = 10 * 1024 * 1024; + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, + totalSize, + Compression.NONE, + Integer.MAX_VALUE); + + List> futures = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + RecordAccumulator.RecordAppendResult result = accum.append( + topic, + partition1, + 0L, + key, + value, + Record.EMPTY_HEADERS, + null, + maxBlockTimeMs, + time.milliseconds(), + cluster); + if (result.future != null) { + futures.add(result.future); + } + } + + accum.beginFlush(); + + // Need to complete all batches to mimic successful sends for awaitFlushCompletion() + List batches = new ArrayList<>(accum.getDeque(tp1)); + for (ProducerBatch batch : batches) { + batch.complete(0L, time.milliseconds()); + } + + // Measure time + long startNanos = System.nanoTime(); + accum.awaitFlushCompletion(); + long durationNanos = System.nanoTime() - startNanos; + + double durationMs = durationNanos / 1_000_000.0; + System.out.printf("flush() with %d records took: %.3f ms%n", numRecords, durationMs); + + for (ProducerBatch batch : batches) { + accum.deallocate(batch); + } + + assertFalse(accum.flushInProgress()); + } } From ac97b878a002e32366c0d0dddb751b1992cd7c46 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 8 Oct 2025 10:04:53 -0700 Subject: [PATCH 08/20] jmh test --- .../internals/RecordAccumulatorTest.java | 52 ----- .../RecordAccumulatorFlushBenchmark.java | 189 ++++++++++++++++++ 2 files changed, 189 insertions(+), 52 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e9a0cef29d8ba..1c6b8e6fef48e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1825,56 +1825,4 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx // Verify all original records are accounted for (no data loss) assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting"); } - - @Test - public void testFlushPerformanceWithManyRecords() throws Exception { - int numRecords = 5000; - int batchSize = 1024; - long totalSize = 10 * 1024 * 1024; - RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, - totalSize, - Compression.NONE, - Integer.MAX_VALUE); - - List> futures = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - RecordAccumulator.RecordAppendResult result = accum.append( - topic, - partition1, - 0L, - key, - value, - Record.EMPTY_HEADERS, - null, - maxBlockTimeMs, - time.milliseconds(), - cluster); - if (result.future != null) { - futures.add(result.future); - } - } - - accum.beginFlush(); - - // Need to complete all batches to mimic successful sends for awaitFlushCompletion() - List batches = new ArrayList<>(accum.getDeque(tp1)); - for (ProducerBatch batch : batches) { - batch.complete(0L, time.milliseconds()); - } - - // Measure time - long startNanos = System.nanoTime(); - accum.awaitFlushCompletion(); - long durationNanos = System.nanoTime() - startNanos; - - double durationMs = durationNanos / 1_000_000.0; - System.out.printf("flush() with %d records took: %.3f ms%n", numRecords, durationMs); - - for (ProducerBatch batch : batches) { - accum.deallocate(batch); - } - - assertFalse(accum.flushInProgress()); - } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java new file mode 100644 index 0000000000000..451e0da2b6962 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.producer; + +import org.apache.kafka.clients.MetadataSnapshot; +import org.apache.kafka.clients.producer.internals.BufferPool; +import org.apache.kafka.clients.producer.internals.ProducerBatch; +import org.apache.kafka.clients.producer.internals.RecordAccumulator; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class RecordAccumulatorFlushBenchmark { + + private static final String TOPIC = "test"; + private static final int PARTITION = 0; + private static final int BATCH_SIZE = 1024; + private static final long TOTAL_SIZE = 10 * 1024 * 1024; + + @Param({"5000", "10000"}) + private int numRecords; + + private RecordAccumulator accum; + private Metrics metrics; + private TopicPartition tp; + private Time time; + + @Setup(Level.Invocation) + public void setup() throws InterruptedException { + tp = new TopicPartition(TOPIC, PARTITION); + time = new MockTime(); + metrics = new Metrics(time); + + Cluster cluster = createTestCluster(); + accum = createRecordAccumulator(); + + appendRecords(cluster); + prepareFlush(); + } + + @TearDown(Level.Invocation) + public void tearDown() { + deallocateBatches(); + if (metrics != null) { + metrics.close(); + } + } + + @Benchmark + public void measureFlushCompletion() throws InterruptedException { + accum.awaitFlushCompletion(); + } + + private Cluster createTestCluster() { + Node node = new Node(0, "localhost", 1111); + MetadataResponse.PartitionMetadata partMetadata = new MetadataResponse.PartitionMetadata( + Errors.NONE, + tp, + Optional.of(node.id()), + Optional.empty(), + null, + null, + null + ); + + Map nodes = Stream.of(node).collect(Collectors.toMap(Node::id, Function.identity())); + MetadataSnapshot metadataCache = new MetadataSnapshot( + null, + nodes, + Collections.singletonList(partMetadata), + Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet(), + null, + Collections.emptyMap() + ); + return metadataCache.cluster(); + } + + private RecordAccumulator createRecordAccumulator() { + return new RecordAccumulator( + new LogContext(), + BATCH_SIZE + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, + Compression.NONE, + Integer.MAX_VALUE, // lingerMs + 100L, // retryBackoffMs + 1000L, // retryBackoffMaxMs + 3200, // deliveryTimeoutMs + metrics, + "producer-metrics", + time, + null, + new BufferPool(TOTAL_SIZE, BATCH_SIZE, metrics, time, "producer-metrics") + ); + } + + private void appendRecords(Cluster cluster) throws InterruptedException { + byte[] key = "key".getBytes(); + byte[] value = "value".getBytes(); + + for (int i = 0; i < numRecords; i++) { + accum.append( + TOPIC, + PARTITION, + 0L, + key, + value, + Record.EMPTY_HEADERS, + null, + 1000L, + time.milliseconds(), + cluster + ); + } + } + + private void prepareFlush() { + accum.beginFlush(); + + // Complete all batches to mimic successful sends + List batches = new ArrayList<>(accum.getDeque(tp)); + for (ProducerBatch batch : batches) { + batch.complete(0L, time.milliseconds()); + } + } + + private void deallocateBatches() { + if (accum != null && tp != null) { + List batches = new ArrayList<>(accum.getDeque(tp)); + for (ProducerBatch batch : batches) { + accum.deallocate(batch); + } + } + } +} From 97ee7085082fc4883669b7d5aaf1ff4cc147986f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 8 Oct 2025 16:34:09 -0700 Subject: [PATCH 09/20] add unit test --- .../internals/RecordAccumulatorTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 1c6b8e6fef48e..70586709cf973 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1825,4 +1825,49 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx // Verify all original records are accounted for (no data loss) assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting"); } + + @Test + public void testSplitBatchProduceFutureWaitsForAllSplits() throws Exception { + long now = time.milliseconds(); + // small batch size to force splitting + RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, Compression.gzip().build(), 10); + + // create a big batch manually + ByteBuffer buffer = ByteBuffer.allocate(4096); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true); + + byte[] value = new byte[1024]; + final AtomicInteger acked = new AtomicInteger(0); + Callback cb = (metadata, exception) -> acked.incrementAndGet(); + + // append two messages so the batch is too big + Future future1 = originalBatch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); + Future future2 = originalBatch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); + assertNotNull(future1); + assertNotNull(future2); + originalBatch.close(); + + // reenqueue + accum.reenqueue(originalBatch, now); + time.sleep(121L); + + // drain + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); + assertFalse(result.readyNodes.isEmpty(), "The batch should be ready"); + Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals(1, drained.size(), "Only node1 should be drained"); + assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained"); + + ProducerBatch drainedBatch = drained.get(node1.id()).get(0); + + // split and reenqueue + int numSplitBatches = accum.splitAndReenqueue(drainedBatch); + assertTrue(numSplitBatches > 0, "Should have split into multiple batches"); + + // original batch's produceFuture should NOT be completed yet + // because the split batches haven't been completed + assertFalse(originalBatch.produceFuture.completed(), + "Original batch produceFuture should not be completed until all split batches are completed"); + } } From 0e1d2bc46791a8f0fd4ace14fe09ee09db7d4380 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 8 Oct 2025 16:35:21 -0700 Subject: [PATCH 10/20] revert previous fix --- .../producer/internals/ProducerBatch.java | 14 ---------- .../producer/internals/RecordAccumulator.java | 26 +++++-------------- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 121c6154de170..5291412502cfe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -238,20 +238,6 @@ public boolean completeExceptionally( return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions); } - /** - * Get all record futures for this batch. - * This is used by flush() to wait on individual records rather than the batch-level future. - * When batches are split, individual record futures are chained to the new batches, - * ensuring that flush() waits for all split batches to complete. - * - * @return List of FutureRecordMetadata for all records in this batch - */ - public List recordFutures() { - return thunks.stream() - .map(thunk -> thunk.future) - .collect(Collectors.toList()); - } - /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 25801b0d75d0a..f0c2719db9612 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.producer.Callback; @@ -1073,25 +1072,12 @@ private boolean appendsInProgress() { */ public void awaitFlushCompletion() throws InterruptedException { try { - // Obtain a snapshot of all record futures at the time of the flush. - // We wait on individual record futures rather than batch-level futures because - // by waiting on record futures, we ensure flush() blocks until all split - // batches complete. - // - // We first collect all futures into a list to avoid holding references to - // ProducerBatch objects, allowing them to be garbage collected after completion. - List futures = new ArrayList<>(); - for (ProducerBatch batch : this.incomplete.copyAll()) { - futures.addAll(batch.recordFutures()); - } - - for (FutureRecordMetadata future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - log.trace("Completed future with exception during flush", e); - } - } + // Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush. + // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage + // collection can occur on the contents. + // The sender will remove ProducerBatch(s) from the original incomplete collection. + for (ProduceRequestResult result : this.incomplete.requestResults()) + result.await(); } finally { this.flushesInProgress.decrementAndGet(); } From 0d381f58e3607dcf40dfa853963ece2608caf837 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 8 Oct 2025 16:43:52 -0700 Subject: [PATCH 11/20] add optimized fix with refactoring --- .../internals/ProduceRequestResult.java | 34 +++++++++++++++++-- .../producer/internals/ProducerBatch.java | 30 ++++++++++++++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 9077b107ab03e..39cd268a9669b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.RecordBatch; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -33,6 +35,7 @@ public class ProduceRequestResult { private final CountDownLatch latch = new CountDownLatch(1); private final TopicPartition topicPartition; + private final List dependentResults = new ArrayList<>(); private volatile Long baseOffset = null; private volatile long logAppendTime = RecordBatch.NO_TIMESTAMP; @@ -70,7 +73,23 @@ public void done() { } /** - * Await the completion of this request + * Add a dependent ProduceRequestResult that must complete before this result is considered complete. + * This is used when a batch is split into multiple batches - the original batch's result + * should not complete until all split batches have completed. + * + * @param dependentResult The dependent result to wait for + */ + public void addDependentResult(ProduceRequestResult dependentResult) { + synchronized (dependentResults) { + dependentResults.add(dependentResult); + } + } + + /** + * Await the completion of this request. + * Note: This only waits for the local latch and not dependent results. + * The dependent results are tracked for completion status via completed() method, + * but individual record futures handle chaining via FutureRecordMetadata.chain(). */ public void await() throws InterruptedException { latch.await(); @@ -129,6 +148,17 @@ public TopicPartition topicPartition() { * Has the request completed? */ public boolean completed() { - return this.latch.getCount() == 0L; + if (this.latch.getCount() != 0L) { + return false; + } + // are all the dependent results completed? + synchronized (dependentResults) { + for (ProduceRequestResult dependentResult : dependentResults) { + if (!dependentResult.completed()) { + return false; + } + } + } + return true; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 5291412502cfe..3eba70ef7d38e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -322,10 +322,16 @@ private void completeFutureAndFireCallbacks( } public Deque split(int splitBatchSize) { - Deque batches = new ArrayDeque<>(); - MemoryRecords memoryRecords = recordsBuilder.build(); + RecordBatch recordBatch = validateAndGetRecordBatch(); + Deque batches = splitRecordsIntoBatches(recordBatch, splitBatchSize); + finalizeSplitBatches(batches); + return batches; + } + private RecordBatch validateAndGetRecordBatch() { + MemoryRecords memoryRecords = recordsBuilder.build(); Iterator recordBatchIter = memoryRecords.batches().iterator(); + if (!recordBatchIter.hasNext()) throw new IllegalStateException("Cannot split an empty producer batch."); @@ -337,6 +343,11 @@ public Deque split(int splitBatchSize) { if (recordBatchIter.hasNext()) throw new IllegalArgumentException("A producer batch should only have one record batch."); + return recordBatch; + } + + private Deque splitRecordsIntoBatches(RecordBatch recordBatch, int splitBatchSize) { + Deque batches = new ArrayDeque<>(); Iterator thunkIter = thunks.iterator(); // We always allocate batch size because we are already splitting a big batch. // And we also Retain the create time of the original batch. @@ -363,9 +374,23 @@ public Deque split(int splitBatchSize) { batch.closeForRecordAppends(); } + return batches; + } + + private void finalizeSplitBatches(Deque batches) { + // Chain all split batch ProduceRequestResults to the original batch's produceFuture + // Ensures the original batch's future doesn't complete until all split batches complete + for (ProducerBatch splitBatch : batches) { + produceFuture.addDependentResult(splitBatch.produceFuture); + } + produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, index -> new RecordBatchTooLargeException()); produceFuture.done(); + assignProducerStateToBatches(batches); + } + + private void assignProducerStateToBatches(Deque batches) { if (hasSequence()) { int sequence = baseSequence(); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch()); @@ -374,7 +399,6 @@ public Deque split(int splitBatchSize) { sequence += newBatch.recordCount; } } - return batches; } private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) { From 41b77699fa01f4de574cd9cf639c98a69b0c0ddf Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 9 Oct 2025 21:04:10 -0700 Subject: [PATCH 12/20] fix typo --- .../kafka/clients/producer/internals/ProduceRequestResult.java | 2 +- .../apache/kafka/clients/producer/internals/ProducerBatch.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 39cd268a9669b..5d974fea0abbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -44,7 +44,7 @@ public class ProduceRequestResult { /** * Create an instance of this class. * - * @param topicPartition The topic and partition to which this record set was sent was sent + * @param topicPartition The topic and partition to which this record set was sent */ public ProduceRequestResult(TopicPartition topicPartition) { this.topicPartition = topicPartition; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 3eba70ef7d38e..8fd9fe2571f3a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; From 788318ea7ddb04e4908c8259f79f6b3c26ee7959 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sun, 12 Oct 2025 08:12:36 -0700 Subject: [PATCH 13/20] new changes --- .../internals/ProduceRequestResult.java | 64 +++++++++---- .../producer/internals/RecordAccumulator.java | 7 +- .../internals/RecordAccumulatorTest.java | 89 +++++++++++-------- 3 files changed, 109 insertions(+), 51 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 5d974fea0abbb..8550015ec54f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.RecordBatch; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -87,9 +89,15 @@ public void addDependentResult(ProduceRequestResult dependentResult) { /** * Await the completion of this request. - * Note: This only waits for the local latch and not dependent results. - * The dependent results are tracked for completion status via completed() method, - * but individual record futures handle chaining via FutureRecordMetadata.chain(). + * + * This only waits for THIS request's latch and not dependent results. + * When a batch is split into multiple batches, dependent results are created and tracked + * separately, but this method does not wait for them. Individual record futures automatically + * handle waiting for their respective split batch via {@link FutureRecordMetadata#chain(FutureRecordMetadata)}, + * which redirects the future to point to the correct split batch's result. + * + * For flush() semantics that require waiting for all dependent results, use + * {@link #awaitAllDependents()}. */ public void await() throws InterruptedException { latch.await(); @@ -105,6 +113,34 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); } + /** + * Await the completion of this request and all the dependent requests. + * + * This method is used by flush() to ensure all split batches have completed before + * returning. This method waits for all dependent {@link ProduceRequestResult}s that + * were created when the batch was split. + * + * @throws InterruptedException if the thread is interrupted while waiting + */ + public void awaitAllDependents() throws InterruptedException { + Queue toWait = new ArrayDeque<>(); + toWait.add(this); + + while (!toWait.isEmpty()) { + ProduceRequestResult current = toWait.poll(); + + // first wait for THIS result's latch to be released + current.latch.await(); + + // add all dependent split batches to the queue. + // we synchronize to get a consistent snapshot, then release the lock + // before continuing but the actual waiting happens outside the lock. + synchronized (current.dependentResults) { + toWait.addAll(current.dependentResults); + } + } + } + /** * The base offset for the request (the first offset in the record set) */ @@ -146,19 +182,17 @@ public TopicPartition topicPartition() { /** * Has the request completed? + * + * This method only checks if THIS request has completed and not its dependent results. + * When a batch is split into multiple batches, the dependent split batches are tracked + * separately. Individual record futures handle waiting for their respective split + * batch via {@link FutureRecordMetadata#chain(FutureRecordMetadata)}, which updates the + * {@code nextRecordMetadata} pointer to follow the correct split batch. + * + * For flush() semantics that require waiting for all dependent results, use + * {@link #awaitAllDependents()}. */ public boolean completed() { - if (this.latch.getCount() != 0L) { - return false; - } - // are all the dependent results completed? - synchronized (dependentResults) { - for (ProduceRequestResult dependentResult : dependentResults) { - if (!dependentResult.completed()) { - return false; - } - } - } - return true; + return this.latch.getCount() == 0L; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index f0c2719db9612..d3c774cb6f5a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -1076,8 +1076,13 @@ public void awaitFlushCompletion() throws InterruptedException { // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage // collection can occur on the contents. // The sender will remove ProducerBatch(s) from the original incomplete collection. + // + // We use awaitAllDependents() here instead of await() to ensure that if any batch + // was split into multiple batches, we wait for all the split batches to complete. + // This is required to guarantee that all records sent before flush() + // must be fully complete, including records in split batches. for (ProduceRequestResult result : this.incomplete.requestResults()) - result.await(); + result.awaitAllDependents(); } finally { this.flushesInProgress.decrementAndGet(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 70586709cf973..ee50eb4c338e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; @@ -71,7 +72,9 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1827,47 +1830,63 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx } @Test - public void testSplitBatchProduceFutureWaitsForAllSplits() throws Exception { - long now = time.milliseconds(); - // small batch size to force splitting - RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, Compression.gzip().build(), 10); - - // create a big batch manually - ByteBuffer buffer = ByteBuffer.allocate(4096); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); - ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true); + public void testProduceRequestResultawaitAllDependents() throws Exception { + ProduceRequestResult parent = new ProduceRequestResult(tp1); + + // make two dependent ProduceRequestResults -- mimicking split batches + ProduceRequestResult dependent1 = new ProduceRequestResult(tp1); + ProduceRequestResult dependent2 = new ProduceRequestResult(tp1); + + // add dependents + parent.addDependentResult(dependent1); + parent.addDependentResult(dependent2); + + parent.set(0L, RecordBatch.NO_TIMESTAMP, null); + parent.done(); + + // parent.completed() should return true (only checks latch) + assertTrue(parent.completed(), "Parent should be completed after done()"); + + // awaitAllDependents() should block because dependents are not complete + final AtomicBoolean awaitCompleted = new AtomicBoolean(false); + final AtomicReference awaitException = new AtomicReference<>(); + + // to prove awaitAllDependents() is blocking, we run it in a separate thread + Thread awaitThread = new Thread(() -> { + try { + parent.awaitAllDependents(); + awaitCompleted.set(true); + } catch (Exception e) { + awaitException.set(e); + } + }); + awaitThread.start(); + Thread.sleep(100); - byte[] value = new byte[1024]; - final AtomicInteger acked = new AtomicInteger(0); - Callback cb = (metadata, exception) -> acked.incrementAndGet(); + // verify awaitAllDependents() is blocking + assertFalse(awaitCompleted.get(), + "awaitAllDependents() should block because dependents are not complete"); - // append two messages so the batch is too big - Future future1 = originalBatch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); - Future future2 = originalBatch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); - assertNotNull(future1); - assertNotNull(future2); - originalBatch.close(); + // now complete the first dependent + dependent1.set(0L, RecordBatch.NO_TIMESTAMP, null); + dependent1.done(); - // reenqueue - accum.reenqueue(originalBatch, now); - time.sleep(121L); + Thread.sleep(100); - // drain - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); - assertFalse(result.readyNodes.isEmpty(), "The batch should be ready"); - Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertEquals(1, drained.size(), "Only node1 should be drained"); - assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained"); + // this should still be blocking because dependent2 is not complete + assertFalse(awaitCompleted.get(), + "awaitAllDependents() should still block because dependent2 is not complete"); - ProducerBatch drainedBatch = drained.get(node1.id()).get(0); + // now complete the second dependent + dependent2.set(0L, RecordBatch.NO_TIMESTAMP, null); + dependent2.done(); - // split and reenqueue - int numSplitBatches = accum.splitAndReenqueue(drainedBatch); - assertTrue(numSplitBatches > 0, "Should have split into multiple batches"); + // now awaitAllDependents() should complete + awaitThread.join(5000); - // original batch's produceFuture should NOT be completed yet - // because the split batches haven't been completed - assertFalse(originalBatch.produceFuture.completed(), - "Original batch produceFuture should not be completed until all split batches are completed"); + assertNull(awaitException.get(), "awaitAllDependents() should not throw exception"); + assertTrue(awaitCompleted.get(), + "awaitAllDependents() should complete after all dependents are done"); + assertFalse(awaitThread.isAlive(), "await thread should have completed"); } } From 467cdf4ee03d56b2b98c32728c3f6a5a78a7994f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 13 Oct 2025 16:10:19 -0700 Subject: [PATCH 14/20] rename to addDependent --- .../clients/producer/internals/ProduceRequestResult.java | 2 +- .../kafka/clients/producer/internals/ProducerBatch.java | 2 +- .../clients/producer/internals/RecordAccumulatorTest.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 8550015ec54f4..99ad88e44588a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -81,7 +81,7 @@ public void done() { * * @param dependentResult The dependent result to wait for */ - public void addDependentResult(ProduceRequestResult dependentResult) { + public void addDependent(ProduceRequestResult dependentResult) { synchronized (dependentResults) { dependentResults.add(dependentResult); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 8fd9fe2571f3a..c4f9c0f7f08d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -380,7 +380,7 @@ private void finalizeSplitBatches(Deque batches) { // Chain all split batch ProduceRequestResults to the original batch's produceFuture // Ensures the original batch's future doesn't complete until all split batches complete for (ProducerBatch splitBatch : batches) { - produceFuture.addDependentResult(splitBatch.produceFuture); + produceFuture.addDependent(splitBatch.produceFuture); } produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, index -> new RecordBatchTooLargeException()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index ee50eb4c338e8..95e7abb2c8d14 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1838,8 +1838,8 @@ public void testProduceRequestResultawaitAllDependents() throws Exception { ProduceRequestResult dependent2 = new ProduceRequestResult(tp1); // add dependents - parent.addDependentResult(dependent1); - parent.addDependentResult(dependent2); + parent.addDependent(dependent1); + parent.addDependent(dependent2); parent.set(0L, RecordBatch.NO_TIMESTAMP, null); parent.done(); From b777871f4c218f02bc9da89eeda8fe4bed6b320f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 13 Oct 2025 16:25:20 -0700 Subject: [PATCH 15/20] fix typo in testname and change sleep time --- .../clients/producer/internals/RecordAccumulatorTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 95e7abb2c8d14..9463e4507e990 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1830,7 +1830,7 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx } @Test - public void testProduceRequestResultawaitAllDependents() throws Exception { + public void testProduceRequestResultAwaitAllDependents() throws Exception { ProduceRequestResult parent = new ProduceRequestResult(tp1); // make two dependent ProduceRequestResults -- mimicking split batches @@ -1861,7 +1861,7 @@ public void testProduceRequestResultawaitAllDependents() throws Exception { } }); awaitThread.start(); - Thread.sleep(100); + Thread.sleep(5); // verify awaitAllDependents() is blocking assertFalse(awaitCompleted.get(), @@ -1871,7 +1871,7 @@ public void testProduceRequestResultawaitAllDependents() throws Exception { dependent1.set(0L, RecordBatch.NO_TIMESTAMP, null); dependent1.done(); - Thread.sleep(100); + Thread.sleep(5); // this should still be blocking because dependent2 is not complete assertFalse(awaitCompleted.get(), From e6368caf80faa2e991c6a9177645c3a336c57500 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 13 Oct 2025 16:27:19 -0700 Subject: [PATCH 16/20] fixed and added more comment --- .../producer/internals/ProduceRequestResult.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 99ad88e44588a..59d7b56113b77 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -37,6 +37,13 @@ public class ProduceRequestResult { private final CountDownLatch latch = new CountDownLatch(1); private final TopicPartition topicPartition; + + /** + * List of dependent ProduceRequestResults created when this batch is split. + * When a batch is too large to send, it's split into multiple smaller batches. + * The original batch's ProduceRequestResult tracks all the split batches here + * so that flush() can wait for all splits to complete via awaitAllDependents(). + */ private final List dependentResults = new ArrayList<>(); private volatile Long baseOffset = null; @@ -76,8 +83,8 @@ public void done() { /** * Add a dependent ProduceRequestResult that must complete before this result is considered complete. - * This is used when a batch is split into multiple batches - the original batch's result - * should not complete until all split batches have completed. + * This is used when a batch is split into multiple batches - in some cases like flush(), the original + * batch's result should not complete until all split batches have completed. * * @param dependentResult The dependent result to wait for */ From 590f1a5daf6569bd918f5839b9da431c0ec3d774 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 15 Oct 2025 13:12:34 -0700 Subject: [PATCH 17/20] fix build error --- .../kafka/clients/producer/internals/RecordAccumulatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9463e4507e990..d6ac75a37a0e6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -36,10 +36,10 @@ import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; From 22fa090fd752c415ba7be27457bc7b9d6e88f87f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 15 Oct 2025 14:03:39 -0700 Subject: [PATCH 18/20] fix build error in jmh --- .../kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java index 451e0da2b6962..605d76abbe70d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/RecordAccumulatorFlushBenchmark.java @@ -48,6 +48,7 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -149,8 +150,8 @@ private RecordAccumulator createRecordAccumulator() { } private void appendRecords(Cluster cluster) throws InterruptedException { - byte[] key = "key".getBytes(); - byte[] value = "value".getBytes(); + byte[] key = "key".getBytes(StandardCharsets.UTF_8); + byte[] value = "value".getBytes(StandardCharsets.UTF_8); for (int i = 0; i < numRecords; i++) { accum.append( From 59dbdd0471ee3a495f32080bb60695d28bccdef7 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 15 Oct 2025 18:08:55 -0700 Subject: [PATCH 19/20] remove test --- ...nceDeliveryMessageLossIntegrationTest.java | 273 ------------------ 1 file changed, 273 deletions(-) delete mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java deleted file mode 100644 index ff5e0634d8195..0000000000000 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.internals.Sender; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.LogCaptureAppender; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.test.TestUtils; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; -import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; -import static org.apache.kafka.test.TestUtils.consumerConfig; -import static org.apache.kafka.test.TestUtils.producerConfig; -import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -@Timeout(600) -@Tag("integration") -public class AtLeastOnceDeliveryMessageLossIntegrationTest { - private static final Logger log = LoggerFactory.getLogger( - AtLeastOnceDeliveryMessageLossIntegrationTest.class); - - private static final int NUM_BROKERS = 1; - private static final int LARGE_RECORD_COUNT = 50000; - private static final int SMALL_RECORD_COUNT = 40000; - - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - - @BeforeAll - public static void startCluster() throws IOException { - CLUSTER.start(); - } - - @AfterAll - public static void closeCluster() { - CLUSTER.stop(); - } - - private String applicationId; - private String inputTopic; - private String outputTopic; - private Properties streamsConfiguration; - private KafkaStreams kafkaStreams; - - @BeforeEach - public void setUp(final TestInfo testInfo) throws Exception { - final String testId = safeUniqueTestName(testInfo); - applicationId = "app-" + testId; - inputTopic = "input-" + testId; - outputTopic = "output-" + testId; - - cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic); - CLUSTER.createTopics(inputTopic, outputTopic); - - setupStreamsConfiguration(); - } - - @AfterEach - public void cleanUp() throws Exception { - if (kafkaStreams != null) { - kafkaStreams.close(); - } - if (streamsConfiguration != null) { - purgeLocalStreamsState(streamsConfiguration); - } - } - - // failing test - @Test - public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { - - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) { - produceInputData(LARGE_RECORD_COUNT); - - kafkaStreams = createStreamsApplication(); - startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); - - waitForProcessingAndCommit(); - - // for this bug - // first offsets are committed, then - // no messages produced in output topic, then - // repeated retries and MESSAGE_TOO_LARGE error - - assertTrue(appender.getMessages().stream() - .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")), - "Should log MESSAGE_TOO_LARGE and splitting retry messages"); - - final int outputRecordCount = verifyOutputRecords(0); // should not produce records - final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced - - assertEquals(0, outputRecordCount, "Output topic should not have any records"); - assertTrue(offsetsCommitted, "Consumer offsets should not be committed"); - } - } - - @Test - public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception { - produceInputData(SMALL_RECORD_COUNT); - - try (final KafkaStreams kafkaStreams = createStreamsApplication()) { - startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); - - waitForProcessingAndCommit(); - - //normal behavior - final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records - final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets - - assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); - assertTrue(offsetsCommitted, "Consumer offsets should be committed"); - } - } - - - private void setupStreamsConfiguration() { - streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - - // AT_LEAST_ONCE processing guarantee - streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L); - - // Producer configuration that can trigger MESSAGE_TOO_LARGE errors - streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000); - streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432); - } - - private void produceInputData(final int recordCount) { - final List> inputRecords = new ArrayList<>(); - for (int i = 1; i <= recordCount; i++) { - inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i)); - } - - IntegrationTestUtils.produceKeyValuesSynchronously( - inputTopic, - inputRecords, - producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), - CLUSTER.time - ); - } - - private void waitForProcessingAndCommit() throws Exception { - // Wait slightly longer than commit interval to ensure processing and offset commits - waitForCondition( - () -> { - try (final Admin adminClient = Admin.create(mkMap( - mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { - final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); - return adminClient - .listConsumerGroupOffsets(applicationId) - .partitionsToOffsetAndMetadata() - .get() - .containsKey(topicPartition); - } catch (final Exception e) { - return false; - } - }, - 35000L, - "Waiting for consumer offsets to be committed" - ); - } - - private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception { - try (final Admin adminClient = Admin.create(mkMap( - mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { - - final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); - - final long committedOffset = adminClient - .listConsumerGroupOffsets(applicationId) - .partitionsToOffsetAndMetadata() - .get() - .get(topicPartition) - .offset(); - - log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset); - return committedOffset == expectedOffset; - } - } - - private int verifyOutputRecords(final int expectedRecordCount) { - try { - final List> outputRecords = - waitUntilMinKeyValueRecordsReceived( - consumerConfig( - CLUSTER.bootstrapServers(), - applicationId + "-test-consumer-" + System.currentTimeMillis(), - StringDeserializer.class, - StringDeserializer.class - ), - outputTopic, - expectedRecordCount, - 30000L - ); - log.info("Output topic {} contains {} records", outputTopic, outputRecords.size()); - return outputRecords.size(); - } catch (final Exception e) { - log.info("Exception while reading output records: {}", e.getMessage()); - return 0; - } - } - - private KafkaStreams createStreamsApplication() { - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream input = builder.stream(inputTopic); - input.peek((key, value) -> { - if (Integer.parseInt(key) % 1000 == 0) { - log.debug("Processing record {}: {} -> {}", key, key, value); - } - }).to(outputTopic); - - return new KafkaStreams(builder.build(), streamsConfiguration); - } -} \ No newline at end of file From 896ecc1a28a6cf3df1e7a78f82791a03d3ef7cea Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 16 Oct 2025 10:38:32 -0700 Subject: [PATCH 20/20] fix minor comment --- .../kafka/clients/producer/internals/ProduceRequestResult.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 59d7b56113b77..d444fb6eaa416 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -82,7 +82,7 @@ public void done() { } /** - * Add a dependent ProduceRequestResult that must complete before this result is considered complete. + * Add a dependent ProduceRequestResult. * This is used when a batch is split into multiple batches - in some cases like flush(), the original * batch's result should not complete until all split batches have completed. *