From 199937304b1e91489a0c9926aa1cde98a257dee4 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 28 Jul 2025 18:02:47 -0700 Subject: [PATCH 1/6] test: add new integration test to check silent message loss issue --- ...nceDeliveryMessageLossIntegrationTest.java | 273 ++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java new file mode 100644 index 0000000000000..d512fb27b9c49 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.consumerConfig; +import static org.apache.kafka.test.TestUtils.producerConfig; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(600) +@Tag("integration") +public class AtLeastOnceDeliveryMessageLossIntegrationTest { + private static final Logger log = LoggerFactory.getLogger( + AtLeastOnceDeliveryMessageLossIntegrationTest.class); + + private static final int NUM_BROKERS = 1; + private static final int LARGE_RECORD_COUNT = 50000; + private static final int SMALL_RECORD_COUNT = 40000; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @BeforeAll + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + + private String applicationId; + private String inputTopic; + private String outputTopic; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + + @BeforeEach + public void setUp(final TestInfo testInfo) throws Exception { + final String testId = safeUniqueTestName(testInfo); + applicationId = "app-" + testId; + inputTopic = "input-" + testId; + outputTopic = "output-" + testId; + + cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic); + CLUSTER.createTopics(inputTopic, outputTopic); + + setupStreamsConfiguration(); + } + + @AfterEach + public void cleanUp() throws Exception { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30)); // need to stop due to infinite retries + } + if (streamsConfiguration != null) { + purgeLocalStreamsState(streamsConfiguration); + } + } + + // failing test + @Test + public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) { + produceInputData(LARGE_RECORD_COUNT); + + kafkaStreams = createStreamsApplication(); + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + // for this bug + // first offsets are committed, then + // no messages produced in output topic, then + // repeated retries and MESSAGE_TOO_LARGE error + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")), + "Should log MESSAGE_TOO_LARGE and splitting retry messages"); + + final int outputRecordCount = verifyOutputRecords(0); // should not produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced + + assertEquals(0, outputRecordCount, "Output topic should not have any records"); + assertTrue(offsetsCommitted, "Consumer offsets should not be committed"); + } + } + + @Test + public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception { + produceInputData(SMALL_RECORD_COUNT); + + try (final KafkaStreams kafkaStreams = createStreamsApplication()) { + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + //normal behavior + final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets + + assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); + assertTrue(offsetsCommitted, "Consumer offsets should be committed"); + } + } + + + private void setupStreamsConfiguration() { + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // AT_LEAST_ONCE processing guarantee + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L); + + // Producer configuration that can trigger MESSAGE_TOO_LARGE errors + streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000); + streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432); + } + + private void produceInputData(final int recordCount) throws Exception { + final List> inputRecords = new ArrayList<>(); + for (int i = 1; i <= recordCount; i++) { + inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i)); + } + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputRecords, + producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), + CLUSTER.time + ); + } + + private void waitForProcessingAndCommit() throws Exception { + // Wait slightly longer than commit interval to ensure processing and offset commits + waitForCondition( + () -> { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + return adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .containsKey(topicPartition); + } catch (final Exception e) { + return false; + } + }, + 35000L, + "Waiting for consumer offsets to be committed" + ); + } + + private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + + final long committedOffset = adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .get(topicPartition) + .offset(); + + log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset); + return committedOffset == expectedOffset; + } + } + + private int verifyOutputRecords(final int expectedRecordCount) { + try { + final List> outputRecords = + waitUntilMinKeyValueRecordsReceived( + consumerConfig( + CLUSTER.bootstrapServers(), + applicationId + "-test-consumer-" + System.currentTimeMillis(), + StringDeserializer.class, + StringDeserializer.class + ), + outputTopic, + expectedRecordCount, + 30000L + ); + log.info("Output topic {} contains {} records", outputTopic, outputRecords.size()); + return outputRecords.size(); + } catch (final Exception e) { + log.info("Exception while reading output records: {}", e.getMessage()); + return 0; + } + } + + private KafkaStreams createStreamsApplication() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream input = builder.stream(inputTopic); + input.peek((key, value) -> { + if (Integer.parseInt(key) % 1000 == 0) { + log.debug("Processing record {}: {} -> {}", key, key, value); + } + }).to(outputTopic); + + return new KafkaStreams(builder.build(), streamsConfiguration); + } +} \ No newline at end of file From fdf61ff54abd55acf1a1119be5f9e72d8dcc33d2 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Fri, 3 Oct 2025 16:11:55 -0700 Subject: [PATCH 2/6] patch to check record level futures --- .../clients/producer/internals/ProducerBatch.java | 15 +++++++++++++++ ...astOnceDeliveryMessageLossIntegrationTest.java | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index c4f9c0f7f08d3..3d80cce08cc9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -237,6 +238,20 @@ public boolean completeExceptionally( return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions); } + /** + * Get all record futures for this batch. + * This is used by flush() to wait on individual records rather than the batch-level future. + * When batches are split, individual futures are chained to the new batches, + * ensuring flush() waits for all split batches to complete. + * + * @return List of FutureRecordMetadata for all records in this batch + */ + public List recordFutures() { + return thunks.stream() + .map(thunk -> thunk.future) + .collect(Collectors.toList()); + } + /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index d512fb27b9c49..7b6470b41f92c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -111,7 +111,7 @@ public void setUp(final TestInfo testInfo) throws Exception { @AfterEach public void cleanUp() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(Duration.ofSeconds(30)); // need to stop due to infinite retries + kafkaStreams.close(); } if (streamsConfiguration != null) { purgeLocalStreamsState(streamsConfiguration); From 690ecd2e29824bc6852efcf074038b6b62082632 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 18 Oct 2025 12:32:16 -0700 Subject: [PATCH 3/6] Revert "patch to check record level futures" This reverts commit 1537aeaf1d835383b911652ecef5acd57995f992. --- .../clients/producer/internals/ProducerBatch.java | 15 --------------- ...astOnceDeliveryMessageLossIntegrationTest.java | 2 +- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 3d80cce08cc9d..c4f9c0f7f08d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -238,20 +237,6 @@ public boolean completeExceptionally( return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions); } - /** - * Get all record futures for this batch. - * This is used by flush() to wait on individual records rather than the batch-level future. - * When batches are split, individual futures are chained to the new batches, - * ensuring flush() waits for all split batches to complete. - * - * @return List of FutureRecordMetadata for all records in this batch - */ - public List recordFutures() { - return thunks.stream() - .map(thunk -> thunk.future) - .collect(Collectors.toList()); - } - /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index 7b6470b41f92c..d512fb27b9c49 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -111,7 +111,7 @@ public void setUp(final TestInfo testInfo) throws Exception { @AfterEach public void cleanUp() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(30)); // need to stop due to infinite retries } if (streamsConfiguration != null) { purgeLocalStreamsState(streamsConfiguration); From 73f1321799b4f052a46c3c77f60a99edb890107d Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 18 Oct 2025 13:13:42 -0700 Subject: [PATCH 4/6] update test --- ...nceDeliveryMessageLossIntegrationTest.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index d512fb27b9c49..d8d8896b0dad1 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -66,7 +66,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; - +/** + * Intended to reproduce KAFKA-19479 + */ @Timeout(600) @Tag("integration") public class AtLeastOnceDeliveryMessageLossIntegrationTest { @@ -118,9 +120,8 @@ public void cleanUp() throws Exception { } } - // failing test @Test - public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { + public void shouldCommitOffsetsAndProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) { produceInputData(LARGE_RECORD_COUNT); @@ -130,20 +131,15 @@ public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWit waitForProcessingAndCommit(); - // for this bug - // first offsets are committed, then - // no messages produced in output topic, then - // repeated retries and MESSAGE_TOO_LARGE error - assertTrue(appender.getMessages().stream() .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")), "Should log MESSAGE_TOO_LARGE and splitting retry messages"); - final int outputRecordCount = verifyOutputRecords(0); // should not produce records - final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced + final int outputRecordCount = verifyOutputRecords(LARGE_RECORD_COUNT); // should produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(LARGE_RECORD_COUNT); // should commit offset - assertEquals(0, outputRecordCount, "Output topic should not have any records"); - assertTrue(offsetsCommitted, "Consumer offsets should not be committed"); + assertEquals(LARGE_RECORD_COUNT, outputRecordCount, "Output topic should have " + LARGE_RECORD_COUNT + " records"); + assertTrue(offsetsCommitted, "Consumer offsets should be committed"); } } @@ -156,8 +152,7 @@ public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() waitForProcessingAndCommit(); - //normal behavior - final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records + final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); // should produce records final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); From 1c8c2ada3fe9594d86378cfd83c01bde78ca42eb Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Tue, 21 Oct 2025 11:13:46 -0700 Subject: [PATCH 5/6] remove positive test --- ...OnceDeliveryMessageLossIntegrationTest.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index d8d8896b0dad1..80490a1853a59 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -142,25 +142,7 @@ public void shouldCommitOffsetsAndProduceOutputRecordsWhenProducerFailsWithMessa assertTrue(offsetsCommitted, "Consumer offsets should be committed"); } } - - @Test - public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception { - produceInputData(SMALL_RECORD_COUNT); - - try (final KafkaStreams kafkaStreams = createStreamsApplication()) { - startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); - - waitForProcessingAndCommit(); - final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); // should produce records - final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets - - assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); - assertTrue(offsetsCommitted, "Consumer offsets should be committed"); - } - } - - private void setupStreamsConfiguration() { streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); From c30fdc6b693aaf1d6dc725f7f303276418c7e902 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Tue, 21 Oct 2025 11:18:11 -0700 Subject: [PATCH 6/6] remove timeout --- .../AtLeastOnceDeliveryMessageLossIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java index 80490a1853a59..59753b3921a1b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -113,7 +113,7 @@ public void setUp(final TestInfo testInfo) throws Exception { @AfterEach public void cleanUp() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(Duration.ofSeconds(30)); // need to stop due to infinite retries + kafkaStreams.close(); } if (streamsConfiguration != null) { purgeLocalStreamsState(streamsConfiguration);