From e0d5f566078db5b4ad463142978c8cbd2af23974 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 1 May 2020 13:56:26 -0700 Subject: [PATCH 1/5] KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest --- .../GlobalKTableEOSIntegrationTest.java | 261 ++++++++++++------ 1 file changed, 172 insertions(+), 89 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 0d5c1b1d0323b..2f816af30d9d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -151,10 +151,20 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { expected.put("d", "4+D"); expected.put("e", "5+null"); - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for initial values"); + try { + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } produceGlobalTableValues(); @@ -163,10 +173,18 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); - TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), - 30000, - "waiting for data in replicated store"); + try { + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError("expected 'J'; got: " + replicatedStore.get(5L)); + newError.addSuppressed(error); + throw newError; + } + produceTopicValues(streamTopic); @@ -176,10 +194,20 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for final values"); + try { + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } } @Test @@ -196,10 +224,20 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { expected.put("c", "3+C"); expected.put("d", "4+D"); - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for initial values"); + try { + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } produceGlobalTableValues(); @@ -208,10 +246,18 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); - TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), - 30000, - "waiting for data in replicated store"); + try { + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError("expected 'J'; got: " + replicatedStore.get(5L)); + newError.addSuppressed(error); + throw newError; + } + produceTopicValues(streamTopic); @@ -221,10 +267,20 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for final values"); + try { + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } } @Test @@ -243,18 +299,29 @@ public void shouldRestoreTransactionalMessages() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); - TestUtils.waitForCondition( - () -> { - final Map result = new HashMap<>(); - final Iterator> it = store.all(); - while (it.hasNext()) { - final KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - }, - 30000L, - "waiting for initial values"); + try { + final Map result = new HashMap<>(); + TestUtils.waitForCondition( + () -> { + result.clear(); + final Iterator> it = store.all(); + while (it.hasNext()) { + final KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + }, + 30000L, + "waiting for initial values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } } @Test @@ -275,18 +342,29 @@ public void shouldNotRestoreAbortedMessages() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); - TestUtils.waitForCondition( - () -> { - final Map result = new HashMap<>(); - final Iterator> it = store.all(); - while (it.hasNext()) { - final KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - }, - 30000L, - "waiting for initial values"); + try { + final Map result = new HashMap<>(); + TestUtils.waitForCondition( + () -> { + result.clear(); + final Iterator> it = store.all(); + while (it.hasNext()) { + final KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + }, + 30000L, + "waiting for initial values" + ); + } catch (final AssertionError error) { + final AssertionError newError = new AssertionError( + "result: " + results + + "\nexpected: " + expected + ); + newError.addSuppressed(error); + throw newError; + } } private void createTopics() throws Exception { @@ -304,59 +382,64 @@ private void startStreams() { private void produceTopicValues(final String topic) { IntegrationTestUtils.produceKeyValuesSynchronously( - topic, - Arrays.asList( - new KeyValue<>("a", 1L), - new KeyValue<>("b", 2L), - new KeyValue<>("c", 3L), - new KeyValue<>("d", 4L), - new KeyValue<>("e", 5L)), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - LongSerializer.class, - new Properties()), - mockTime); + topic, + Arrays.asList( + new KeyValue<>("a", 1L), + new KeyValue<>("b", 2L), + new KeyValue<>("c", 3L), + new KeyValue<>("d", 4L), + new KeyValue<>("e", 5L) + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + LongSerializer.class, + new Properties() + ), + mockTime + ); } private void produceAbortedMessages() throws Exception { final Properties properties = new Properties(); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); - properties.put(ProducerConfig.RETRIES_CONFIG, 1); IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp( - globalTableTopic, Arrays.asList( - new KeyValue<>(1L, "A"), - new KeyValue<>(2L, "B"), - new KeyValue<>(3L, "C"), - new KeyValue<>(4L, "D") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - LongSerializer.class, - StringSerializer.class, - properties), - mockTime.milliseconds()); + globalTableTopic, Arrays.asList( + new KeyValue<>(1L, "A"), + new KeyValue<>(2L, "B"), + new KeyValue<>(3L, "C"), + new KeyValue<>(4L, "D") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + properties + ), + mockTime.milliseconds() + ); } private void produceInitialGlobalTableValues() { final Properties properties = new Properties(); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); - properties.put(ProducerConfig.RETRIES_CONFIG, 1); IntegrationTestUtils.produceKeyValuesSynchronously( - globalTableTopic, - Arrays.asList( - new KeyValue<>(1L, "A"), - new KeyValue<>(2L, "B"), - new KeyValue<>(3L, "C"), - new KeyValue<>(4L, "D") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - LongSerializer.class, - StringSerializer.class, - properties), - mockTime, - true); + globalTableTopic, + Arrays.asList( + new KeyValue<>(1L, "A"), + new KeyValue<>(2L, "B"), + new KeyValue<>(3L, "C"), + new KeyValue<>(4L, "D") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + properties + ), + mockTime, + true + ); } private void produceGlobalTableValues() { From 1c1d325a1567f7d6f78e8406ca5f606f97fceda4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 1 May 2020 14:07:08 -0700 Subject: [PATCH 2/5] fix formatting --- .../GlobalKTableEOSIntegrationTest.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 2f816af30d9d5..8f57399b8237e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -444,18 +444,21 @@ private void produceInitialGlobalTableValues() { private void produceGlobalTableValues() { IntegrationTestUtils.produceKeyValuesSynchronously( - globalTableTopic, - Arrays.asList( - new KeyValue<>(1L, "F"), - new KeyValue<>(2L, "G"), - new KeyValue<>(3L, "H"), - new KeyValue<>(4L, "I"), - new KeyValue<>(5L, "J")), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - LongSerializer.class, - StringSerializer.class, - new Properties()), - mockTime); + globalTableTopic, + Arrays.asList( + new KeyValue<>(1L, "F"), + new KeyValue<>(2L, "G"), + new KeyValue<>(3L, "H"), + new KeyValue<>(4L, "I"), + new KeyValue<>(5L, "J") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + new Properties() + ), + mockTime + ); } } From dbec61df84780f0033d0c170fc065bec1b0c5891 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 1 May 2020 14:23:22 -0700 Subject: [PATCH 3/5] Use supplier to provide error message --- .../GlobalKTableEOSIntegrationTest.java | 194 +++++++----------- 1 file changed, 70 insertions(+), 124 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 8f57399b8237e..97a78b47af53a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -151,20 +151,13 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { expected.put("d", "4+D"); expected.put("e", "5+null"); - try { - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for initial values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + () -> "waiting for initial values;" + + "\n expected: " + expected + + "\n received: " + results + ); produceGlobalTableValues(); @@ -173,17 +166,11 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); - try { - TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), - 30000, - "waiting for data in replicated store" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError("expected 'J'; got: " + replicatedStore.get(5L)); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + () -> "waiting for data in replicated store; expected 'J'; received: " + replicatedStore.get(5L) + ); produceTopicValues(streamTopic); @@ -194,20 +181,13 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { expected.put("d", "4+I"); expected.put("e", "5+J"); - try { - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for final values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + () -> "waiting for final values" + + "\n expected:" + expected + + "\n received: " + results + ); } @Test @@ -224,20 +204,13 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { expected.put("c", "3+C"); expected.put("d", "4+D"); - try { - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for initial values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + () -> "waiting for initial values" + + "\n expected:" + expected + + "\n received: " + results + ); produceGlobalTableValues(); @@ -246,17 +219,11 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); - try { - TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), - 30000, - "waiting for data in replicated store" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError("expected 'J'; got: " + replicatedStore.get(5L)); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + () -> "waiting for data in replicated store; expected 'J'; received: " + replicatedStore.get(5L) + ); produceTopicValues(streamTopic); @@ -267,20 +234,13 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { expected.put("d", "4+I"); expected.put("e", "5+J"); - try { - TestUtils.waitForCondition( - () -> results.equals(expected), - 30000L, - "waiting for final values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + () -> "waiting for final values" + + "\n expected:" + expected + + "\n received: " + results + ); } @Test @@ -299,29 +259,22 @@ public void shouldRestoreTransactionalMessages() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); - try { - final Map result = new HashMap<>(); - TestUtils.waitForCondition( - () -> { - result.clear(); - final Iterator> it = store.all(); - while (it.hasNext()) { - final KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - }, - 30000L, - "waiting for initial values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + final Map result = new HashMap<>(); + TestUtils.waitForCondition( + () -> { + result.clear(); + final Iterator> it = store.all(); + while (it.hasNext()) { + final KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + }, + 30000L, + () -> "waiting for initial values" + + "\n expected:" + expected + + "\n received: " + results + ); } @Test @@ -342,29 +295,22 @@ public void shouldNotRestoreAbortedMessages() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); - try { - final Map result = new HashMap<>(); - TestUtils.waitForCondition( - () -> { - result.clear(); - final Iterator> it = store.all(); - while (it.hasNext()) { - final KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - }, - 30000L, - "waiting for initial values" - ); - } catch (final AssertionError error) { - final AssertionError newError = new AssertionError( - "result: " + results + - "\nexpected: " + expected - ); - newError.addSuppressed(error); - throw newError; - } + final Map result = new HashMap<>(); + TestUtils.waitForCondition( + () -> { + result.clear(); + final Iterator> it = store.all(); + while (it.hasNext()) { + final KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + }, + 30000L, + () -> "waiting for initial values" + + "\n expected:" + expected + + "\n received: " + results + ); } private void createTopics() throws Exception { From df27f9ef06369c82b4ea37722ae8be89ff24729a Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 3 May 2020 21:55:10 -0700 Subject: [PATCH 4/5] Another try to fix flakiness (plus some cleanup) --- .../integration/EosIntegrationTest.java | 2 +- .../GlobalKTableEOSIntegrationTest.java | 22 +- ...StreamAggregationDedupIntegrationTest.java | 3 +- .../KStreamAggregationIntegrationTest.java | 13 +- .../KStreamRepartitionIntegrationTest.java | 2 +- .../StateRestorationIntegrationTest.java | 3 +- .../utils/IntegrationTestUtils.java | 222 +++++++++++------- 7 files changed, 170 insertions(+), 97 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index fa65766d962ad..55a9b75766241 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -847,7 +847,7 @@ private Set> getMaxPerKey(final List> } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) throws InterruptedException { + final Set> expectedStoreContent) throws Exception { final ReadOnlyKeyValueStore store = IntegrationTestUtils .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 97a78b47af53a..85ef2c86f6f2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -185,7 +185,7 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { () -> results.equals(expected), 30000L, () -> "waiting for final values" + - "\n expected:" + expected + + "\n expected: " + expected + "\n received: " + results ); } @@ -208,7 +208,7 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { () -> results.equals(expected), 30000L, () -> "waiting for initial values" + - "\n expected:" + expected + + "\n expected: " + expected + "\n received: " + results ); @@ -238,7 +238,7 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { () -> results.equals(expected), 30000L, () -> "waiting for final values" + - "\n expected:" + expected + + "\n expected: " + expected + "\n received: " + results ); } @@ -272,7 +272,7 @@ public void shouldRestoreTransactionalMessages() throws Exception { }, 30000L, () -> "waiting for initial values" + - "\n expected:" + expected + + "\n expected: " + expected + "\n received: " + results ); } @@ -308,7 +308,7 @@ public void shouldNotRestoreAbortedMessages() throws Exception { }, 30000L, () -> "waiting for initial values" + - "\n expected:" + expected + + "\n expected: " + expected + "\n received: " + results ); } @@ -327,6 +327,9 @@ private void startStreams() { } private void produceTopicValues(final String topic) { + final Properties config = new Properties(); + config.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + IntegrationTestUtils.produceKeyValuesSynchronously( topic, Arrays.asList( @@ -340,7 +343,7 @@ private void produceTopicValues(final String topic) { CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, - new Properties() + config ), mockTime ); @@ -349,6 +352,7 @@ private void produceTopicValues(final String topic) { private void produceAbortedMessages() throws Exception { final Properties properties = new Properties(); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp( globalTableTopic, Arrays.asList( new KeyValue<>(1L, "A"), @@ -369,6 +373,7 @@ private void produceAbortedMessages() throws Exception { private void produceInitialGlobalTableValues() { final Properties properties = new Properties(); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( @@ -389,6 +394,9 @@ private void produceInitialGlobalTableValues() { } private void produceGlobalTableValues() { + final Properties config = new Properties(); + config.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( @@ -402,7 +410,7 @@ private void produceGlobalTableValues() { CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, - new Properties() + config ), mockTime ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index dbe0b047a7a97..80b339689cbba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -239,7 +239,8 @@ private void startStreams() { private void validateReceivedMessages(final Deserializer keyDeserializer, final Deserializer valueDeserializer, final List> expectedRecords) - throws InterruptedException { + throws Exception { + final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index e04e25e4e4f6b..7de4f62fe1142 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -808,14 +808,17 @@ private void startStreams() { private List> receiveMessages(final Deserializer keyDeserializer, final Deserializer valueDeserializer, final int numMessages) - throws InterruptedException { + throws Exception { + return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages); } private List> receiveMessages(final Deserializer keyDeserializer, final Deserializer valueDeserializer, final Class innerClass, - final int numMessages) throws InterruptedException { + final int numMessages) + throws Exception { + final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -835,9 +838,9 @@ private List> receiveMessages(final Deserializer< } private List> receiveMessagesWithTimestamp(final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Class innerClass, - final int numMessages) throws InterruptedException { + final Deserializer valueDeserializer, + final Class innerClass, + final int numMessages) throws Exception { final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 6bb1269cc9477..177d6db4a8d31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -807,7 +807,7 @@ private KafkaStreams startStreams(final StreamsBuilder builder, private void validateReceivedMessages(final Deserializer keySerializer, final Deserializer valueSerializer, - final List> expectedRecords) throws InterruptedException { + final List> expectedRecords) throws Exception { final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java index e22ff4f76597d..8b7eb2ac614ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java @@ -42,7 +42,6 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutionException; @Category({IntegrationTest.class}) public class StateRestorationIntegrationTest { @@ -77,7 +76,7 @@ public void setUp() throws Exception { } @Test - public void shouldRestoreNullRecord() throws InterruptedException, ExecutionException { + public void shouldRestoreNullRecord() throws Exception { builder.table(INPUT_TOPIC, Materialized.as( Stores.persistentTimestampedKeyValueStore(STATE_STORE_NAME)) .withKeySerde(Serdes.Integer()) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index eb7fa4c1cb38b..222d2780911f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -133,8 +133,7 @@ public static String safeUniqueTestName(final Class testClass, final TestName * * @param streamsConfiguration Streams configuration settings */ - public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws - IOException { + public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException { final String tmpDir = TestUtils.IO_TMP_DIR.getPath(); final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); if (path != null) { @@ -151,7 +150,9 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina cleanStateBeforeTest(cluster, 1, topics); } - public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final int partitionCount, final String... topics) { + public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, + final int partitionCount, + final String... topics) { try { cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); for (final String topic : topics) { @@ -179,8 +180,10 @@ public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster * @param Key type of the data records * @param Value type of the data records */ - public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Time time) { + public static void produceKeyValuesSynchronously(final String topic, + final Collection> records, + final Properties producerConfig, + final Time time) { produceKeyValuesSynchronously(topic, records, producerConfig, time, false); } @@ -193,8 +196,11 @@ public static void produceKeyValuesSynchronously( * @param Key type of the data records * @param Value type of the data records */ - public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Time time) { + public static void produceKeyValuesSynchronously(final String topic, + final Collection> records, + final Properties producerConfig, + final Headers headers, + final Time time) { produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false); } @@ -207,8 +213,11 @@ public static void produceKeyValuesSynchronously( * @param Key type of the data records * @param Value type of the data records */ - public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Time time, final boolean enableTransactions) { + public static void produceKeyValuesSynchronously(final String topic, + final Collection> records, + final Properties producerConfig, + final Time time, + final boolean enableTransactions) { produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions); } @@ -228,7 +237,6 @@ public static void produceKeyValuesSynchronously(final String topic, final Headers headers, final Time time, final boolean enableTransactions) { - try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); @@ -241,7 +249,6 @@ public static void produceKeyValuesSynchronously(final String topic, if (enableTransactions) { producer.commitTransaction(); } - producer.flush(); } } @@ -274,7 +281,6 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin final Properties producerConfig, final Long timestamp, final boolean enableTransactions) { - produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions); } @@ -294,7 +300,6 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin final Headers headers, final Long timestamp, final boolean enableTransactions) { - try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); @@ -306,7 +311,6 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin if (enableTransactions) { producer.commitTransaction(); } - producer.flush(); } } @@ -323,7 +327,14 @@ public static void produceSynchronously(final Properties producerConfig, final LinkedList> futures = new LinkedList<>(); for (final KeyValueTimestamp record : toProduce) { final Future f = producer.send( - new ProducerRecord<>(topic, partition.orElse(null), record.timestamp(), record.key(), record.value(), null) + new ProducerRecord<>( + topic, + partition.orElse(null), + record.timestamp(), + record.key(), + record.value(), + null + ) ); futures.add(f); } @@ -355,11 +366,12 @@ public static void produceSynchronously(final Properties producerConfig, * @param Key type of the data records * @param Value type of the data records */ - public static void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic, - final Collection> records, - final Properties producerConfig, - final Long timestamp) - throws ExecutionException, InterruptedException { + public static void produceAbortedKeyValuesSynchronouslyWithTimestamp( + final String topic, + final Collection> records, + final Properties producerConfig, + final Long timestamp + ) throws Exception { try (final Producer producer = new KafkaProducer<>(producerConfig)) { producer.initTransactions(); for (final KeyValue record : records) { @@ -453,9 +465,11 @@ public static void waitForCompletion(final KafkaStreams streams, * @return All the records consumed, or null if no records are consumed */ @SuppressWarnings("WeakerAccess") - public static List> waitUntilMinRecordsReceived(final Properties consumerConfig, - final String topic, - final int expectedNumRecords) throws InterruptedException { + public static List> waitUntilMinRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords + ) throws Exception { return waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } @@ -471,12 +485,19 @@ public static List> waitUntilMinRecordsReceived(fina * @return All the records consumed, or null if no records are consumed */ @SuppressWarnings("WeakerAccess") - public static List> waitUntilMinRecordsReceived(final Properties consumerConfig, - final String topic, - final int expectedNumRecords, - final long waitTime) throws InterruptedException { + public static List> waitUntilMinRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime + ) throws Exception { final List> accumData = new ArrayList<>(); - final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime); + final String reason = String.format( + "Did not receive all %d records from topic %s within %d ms", + expectedNumRecords, + topic, + waitTime + ); try (final Consumer consumer = createConsumer(consumerConfig)) { retryOnExceptionWithTimeout(waitTime, () -> { final List> readData = @@ -498,9 +519,11 @@ public static List> waitUntilMinRecordsReceived(fina * @param Value type of the data records * @return All the records consumed, or null if no records are consumed */ - public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, - final String topic, - final int expectedNumRecords) throws InterruptedException { + public static List> waitUntilMinKeyValueRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords + ) throws Exception { return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } @@ -516,12 +539,19 @@ public static List> waitUntilMinKeyValueRecordsReceived(fi * @return All the records consumed, or null if no records are consumed * @throws AssertionError if the given wait time elapses */ - public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, - final String topic, - final int expectedNumRecords, - final long waitTime) throws InterruptedException { + public static List> waitUntilMinKeyValueRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime + ) throws Exception { final List> accumData = new ArrayList<>(); - final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime); + final String reason = String.format( + "Did not receive all %d records from topic %s within %d ms", + expectedNumRecords, + topic, + waitTime + ); try (final Consumer consumer = createConsumer(consumerConfig)) { retryOnExceptionWithTimeout(waitTime, () -> { final List> readData = @@ -544,12 +574,19 @@ public static List> waitUntilMinKeyValueRecordsReceived(fi * @param Key type of the data records * @param Value type of the data records */ - public static List> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig, - final String topic, - final int expectedNumRecords, - final long waitTime) throws InterruptedException { + public static List> waitUntilMinKeyValueWithTimestampRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime + ) throws Exception { final List> accumData = new ArrayList<>(); - final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime); + final String reason = String.format( + "Did not receive all %d records from topic %s within %d ms", + expectedNumRecords, + topic, + waitTime + ); try (final Consumer consumer = createConsumer(consumerConfig)) { retryOnExceptionWithTimeout(waitTime, () -> { final List> readData = @@ -571,9 +608,11 @@ public static List> waitUntilMinKeyValueWithTimes * @param Value type of the data records * @return All the mappings consumed, or null if no records are consumed */ - public static List> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, - final String topic, - final List> expectedRecords) throws InterruptedException { + public static List> waitUntilFinalKeyValueRecordsReceived( + final Properties consumerConfig, + final String topic, + final List> expectedRecords + ) throws Exception { return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT); } @@ -587,9 +626,11 @@ public static List> waitUntilFinalKeyValueRecordsReceived( * @param Value type of the data records * @return All the mappings consumed, or null if no records are consumed */ - public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, - final String topic, - final List> expectedRecords) throws InterruptedException { + public static List> waitUntilFinalKeyValueTimestampRecordsReceived( + final Properties consumerConfig, + final String topic, + final List> expectedRecords + ) throws Exception { return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT, true); } @@ -605,10 +646,12 @@ public static List> waitUntilFinalKeyValueTimesta * @return All the mappings consumed, or null if no records are consumed */ @SuppressWarnings("WeakerAccess") - public static List> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, - final String topic, - final List> expectedRecords, - final long waitTime) throws InterruptedException { + public static List> waitUntilFinalKeyValueRecordsReceived( + final Properties consumerConfig, + final String topic, + final List> expectedRecords, + final long waitTime + ) throws Exception { return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false); } @@ -617,7 +660,7 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro final String topic, final List expectedRecords, final long waitTime, - final boolean withTimestamp) throws InterruptedException { + final boolean withTimestamp) throws Exception { final List accumData = new ArrayList<>(); try (final Consumer consumer = createConsumer(consumerConfig)) { final TestCondition valuesRead = () -> { @@ -630,19 +673,22 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro accumData.addAll(readData); // filter out all intermediate records we don't want - final List accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList()); + final List accumulatedActual = accumData + .stream() + .filter(expectedRecords::contains) + .collect(Collectors.toList()); // still need to check that for each key, the ordering is expected final Map> finalAccumData = new HashMap<>(); for (final T kv : accumulatedActual) { finalAccumData.computeIfAbsent( - (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key), + withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, key -> new ArrayList<>()).add(kv); } final Map> finalExpected = new HashMap<>(); for (final T kv : expectedRecords) { finalExpected.computeIfAbsent( - (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key), + withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, key -> new ArrayList<>()).add(kv); } @@ -668,8 +714,7 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro */ public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, - final int expectedNumRecords) throws InterruptedException { - + final int expectedNumRecords) throws Exception { return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } @@ -686,9 +731,14 @@ public static List waitUntilMinValuesRecordsReceived(final Properties con public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, - final long waitTime) throws InterruptedException { + final long waitTime) throws Exception { final List accumData = new ArrayList<>(); - final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime); + final String reason = String.format( + "Did not receive all %d records from topic %s within %d ms", + expectedNumRecords, + topic, + waitTime + ); try (final Consumer consumer = createConsumer(consumerConfig)) { retryOnExceptionWithTimeout(waitTime, () -> { final List readData = @@ -759,7 +809,7 @@ private static void waitUntilMetadataIsPropagated(final List server * @param timeout the time to wait for the streams to all be in {@link State#RUNNING} state. */ public static void startApplicationAndWaitUntilRunning(final List streamsList, - final Duration timeout) throws InterruptedException { + final Duration timeout) throws Exception { final Lock stateLock = new ReentrantLock(); final Condition stateUpdate = stateLock.newCondition(); final Map stateMap = new HashMap<>(); @@ -808,8 +858,10 @@ public static void startApplicationAndWaitUntilRunning(final List final long millisRemaining = expectedEnd - System.currentTimeMillis(); if (millisRemaining <= 0) { - fail("Application did not reach a RUNNING state for all streams instances. Non-running instances: " + - nonRunningStreams); + fail( + "Application did not reach a RUNNING state for all streams instances. " + + "Non-running instances: " + nonRunningStreams + ); } stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS); @@ -829,7 +881,7 @@ public static void startApplicationAndWaitUntilRunning(final List */ public static void waitForApplicationState(final List streamsList, final State state, - final Duration timeout) throws InterruptedException { + final Duration timeout) throws Exception { retryOnExceptionWithTimeout(timeout.toMillis(), () -> { final Map streamsToStates = streamsList .stream() @@ -840,8 +892,13 @@ public static void waitForApplicationState(final List streamsList, .filter(entry -> entry.getValue() != state) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - final String reason = String.format("Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", - streamsList, state, timeout.toMillis(), wrongStateMap); + final String reason = String.format( + "Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", + streamsList, + state, + timeout.toMillis(), + wrongStateMap + ); assertThat(reason, wrongStateMap.isEmpty()); }); } @@ -874,8 +931,11 @@ public boolean conditionMet() { public static void waitForEmptyConsumerGroup(final Admin adminClient, final String applicationId, final long timeoutMs) throws Exception { - TestUtils.waitForCondition(new IntegrationTestUtils.ConsumerGroupInactiveCondition(adminClient, applicationId), timeoutMs, - "Test consumer group " + applicationId + " still active even after waiting " + timeoutMs + " ms."); + TestUtils.waitForCondition( + new IntegrationTestUtils.ConsumerGroupInactiveCondition(adminClient, applicationId), + timeoutMs, + "Test consumer group " + applicationId + " still active even after waiting " + timeoutMs + " ms." + ); } private static StateListener getStateListener(final KafkaStreams streams) { @@ -891,11 +951,10 @@ private static StateListener getStateListener(final KafkaStreams streams) { public static void verifyKeyValueTimestamps(final Properties consumerConfig, final String topic, final List> expected) { - final List> results; try { results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size()); - } catch (final InterruptedException e) { + } catch (final Exception e) { throw new RuntimeException(e); } @@ -919,7 +978,7 @@ public static void verifyKeyValueTimestamps(final Properties consumerConfig, final List> results; try { results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size()); - } catch (final InterruptedException e) { + } catch (final Exception e) { throw new RuntimeException(e); } @@ -943,8 +1002,10 @@ private static void compareKeyValueTimestamp(final ConsumerRecord r final K recordKey = record.key(); final V recordValue = record.value(); final long recordTimestamp = record.timestamp(); - final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + - " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp); + final AssertionError error = new AssertionError( + "Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp + ); if (recordKey != null) { if (!recordKey.equals(expectedKey)) { throw error; @@ -1079,7 +1140,9 @@ private static KafkaConsumer createConsumer(final Properties consum return new KafkaConsumer<>(filtered); } - public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) { + public static KafkaStreams getStartedStreams(final Properties streamsConfig, + final StreamsBuilder builder, + final boolean clean) { final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); if (clean) { driver.cleanUp(); @@ -1090,21 +1153,21 @@ public static KafkaStreams getStartedStreams(final Properties streamsConfig, fin public static S getStore(final String storeName, final KafkaStreams streams, - final QueryableStoreType storeType) throws InterruptedException { + final QueryableStoreType storeType) throws Exception { return getStore(DEFAULT_TIMEOUT, storeName, streams, storeType); } public static S getStore(final String storeName, final KafkaStreams streams, final boolean enableStaleQuery, - final QueryableStoreType storeType) throws InterruptedException { + final QueryableStoreType storeType) throws Exception { return getStore(DEFAULT_TIMEOUT, storeName, streams, enableStaleQuery, storeType); } public static S getStore(final long waitTime, final String storeName, final KafkaStreams streams, - final QueryableStoreType storeType) throws InterruptedException { + final QueryableStoreType storeType) throws Exception { return getStore(waitTime, storeName, streams, false, storeType); } @@ -1112,7 +1175,7 @@ public static S getStore(final long waitTime, final String storeName, final KafkaStreams streams, final boolean enableStaleQuery, - final QueryableStoreType storeType) throws InterruptedException { + final QueryableStoreType storeType) throws Exception { final StoreQueryParameters param = enableStaleQuery ? StoreQueryParameters.fromNameAndType(storeName, storeType).enableStaleStores() : StoreQueryParameters.fromNameAndType(storeName, storeType); @@ -1120,15 +1183,14 @@ public static S getStore(final long waitTime, } public static S getStore(final KafkaStreams streams, - final StoreQueryParameters param) throws InterruptedException { + final StoreQueryParameters param) throws Exception { return getStore(DEFAULT_TIMEOUT, streams, param); } public static S getStore(final long waitTime, final KafkaStreams streams, - final StoreQueryParameters param) throws InterruptedException { + final StoreQueryParameters param) throws Exception { final long expectedEnd = System.currentTimeMillis() + waitTime; - while (true) { try { return streams.store(param); From ab2e6a445b177130be2b69e2fe0464e3e31904d4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 4 May 2020 22:38:05 -0700 Subject: [PATCH 5/5] Improve wait condition --- .../GlobalKTableEOSIntegrationTest.java | 54 ++++++++++++++----- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 85ef2c86f6f2b..5706f67a62115 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -119,10 +119,12 @@ public void before() throws Exception { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300); - globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), - Materialized.>as(globalStore) - .withKeySerde(Serdes.Long()) - .withValueSerde(Serdes.String())); + globalTable = builder.globalTable( + globalTableTopic, + Consumed.with(Serdes.Long(), Serdes.String()), + Materialized.>as(globalStore) + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String())); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); stream = builder.stream(streamTopic, stringLongConsumed); foreachAction = results::put; @@ -166,10 +168,25 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); + + final Map expectedState = new HashMap<>(); + expectedState.put(1L, "F"); + expectedState.put(2L, "G"); + expectedState.put(3L, "H"); + expectedState.put(4L, "I"); + expectedState.put(5L, "J"); + + final Map globalState = new HashMap<>(); TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), + () -> { + globalState.clear(); + replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value)); + return globalState.equals(expectedState); + }, 30000, - () -> "waiting for data in replicated store; expected 'J'; received: " + replicatedStore.get(5L) + () -> "waiting for data in replicated store" + + "\n expected: " + expectedState + + "\n received: " + globalState ); @@ -219,10 +236,25 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertNotNull(replicatedStore); + + final Map expectedState = new HashMap<>(); + expectedState.put(1L, "F"); + expectedState.put(2L, "G"); + expectedState.put(3L, "H"); + expectedState.put(4L, "I"); + expectedState.put(5L, "J"); + + final Map globalState = new HashMap<>(); TestUtils.waitForCondition( - () -> "J".equals(replicatedStore.get(5L)), + () -> { + globalState.clear(); + replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value)); + return globalState.equals(expectedState); + }, 30000, - () -> "waiting for data in replicated store; expected 'J'; received: " + replicatedStore.get(5L) + () -> "waiting for data in replicated store" + + "\n expected: " + expectedState + + "\n received: " + globalState ); @@ -299,11 +331,7 @@ public void shouldNotRestoreAbortedMessages() throws Exception { TestUtils.waitForCondition( () -> { result.clear(); - final Iterator> it = store.all(); - while (it.hasNext()) { - final KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } + store.all().forEachRemaining(pair -> result.put(pair.key, pair.value)); return result.equals(expected); }, 30000L,