From e82b6918587aa14538ca776caca385c77185a5c5 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 16 Jun 2020 15:58:05 -0500 Subject: [PATCH 1/2] MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest --- ...ailabilityTaskAssignorIntegrationTest.java | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 725c8058e5878..30e0891d393b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -95,6 +95,11 @@ private void shouldScaleOutWithWarmupTasks(final Function inputTopicPartitions = mkSet( + new TopicPartition(inputTopic, 0), + new TopicPartition(inputTopic, 1) + ); + final String storeName = "store" + testId; final String storeChangelog = appId + "-store" + testId + "-changelog"; final Set changelogTopicPartitions = mkSet( @@ -133,14 +138,12 @@ private void shouldScaleOutWithWarmupTasks(final Function producer = new KafkaProducer<>(producerProperties)) { - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < numberOfRecords; i++) { producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); } } @@ -159,11 +162,18 @@ private void shouldScaleOutWithWarmupTasks(final Function consumer = new KafkaConsumer<>(consumerProperties)) { kafkaStreams0.start(); + // just make sure we actually wrote all the input records + TestUtils.waitForCondition( + () -> getEndOffsetSum(inputTopicPartitions, consumer) == numberOfRecords, + 120_000L, + () -> "Input records haven't all been written to the input topic: " + getEndOffsetSum(inputTopicPartitions, consumer) + ); + // wait until all the input records are in the changelog TestUtils.waitForCondition( - () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000, + () -> getEndOffsetSum(changelogTopicPartitions, consumer) == numberOfRecords, 120_000L, - () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer) + () -> "Input records haven't all been written to the changelog: " + getEndOffsetSum(changelogTopicPartitions, consumer) ); final AtomicLong instance1TotalRestored = new AtomicLong(-1); @@ -240,6 +250,14 @@ public void onRestoreEnd(final TopicPartition topicPartition, } } + private String getKiloByteValue() { + final StringBuilder kiloBuilder = new StringBuilder(1000); + for (int i = 0; i < 1000; i++) { + kiloBuilder.append('0'); + } + return kiloBuilder.toString(); + } + private void assertFalseNoRetry(final boolean assertion, final String message) { if (assertion) { throw new NoRetryException( @@ -268,8 +286,8 @@ private static Properties streamsProperties(final String appId, ); } - private static long getChangelogOffsetSum(final Set changelogTopicPartitions, - final Consumer consumer) { + private static long getEndOffsetSum(final Set changelogTopicPartitions, + final Consumer consumer) { long sum = 0; final Collection values = consumer.endOffsets(changelogTopicPartitions).values(); for (final Long value : values) { From 9f863692102cbe898dc2df78bfec0967e80d1af0 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 16 Jun 2020 22:23:54 -0500 Subject: [PATCH 2/2] fix style --- ...ailabilityTaskAssignorIntegrationTest.java | 63 ++++++++++--------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 30e0891d393b7..7e7cf2996a7d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -129,40 +129,16 @@ private void shouldScaleOutWithWarmupTasks(final Function producer = new KafkaProducer<>(producerProperties)) { - for (int i = 0; i < numberOfRecords; i++) { - producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); - } - } - - final Properties consumerProperties = mkProperties( - mkMap( - mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), - mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) - ) - ); - + produceTestData(inputTopic, numberOfRecords); try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); - final Consumer consumer = new KafkaConsumer<>(consumerProperties)) { + final Consumer consumer = new KafkaConsumer<>(getConsumerProperties())) { kafkaStreams0.start(); - // just make sure we actually wrote all the input records + // sanity check: just make sure we actually wrote all the input records TestUtils.waitForCondition( () -> getEndOffsetSum(inputTopicPartitions, consumer) == numberOfRecords, 120_000L, @@ -250,7 +226,36 @@ public void onRestoreEnd(final TopicPartition topicPartition, } } - private String getKiloByteValue() { + private void produceTestData(final String inputTopic, final int numberOfRecords) { + final String kilo = getKiloByteValue(); + + final Properties producerProperties = mkProperties( + mkMap( + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ProducerConfig.ACKS_CONFIG, "all"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + ) + ); + + try (final Producer producer = new KafkaProducer<>(producerProperties)) { + for (int i = 0; i < numberOfRecords; i++) { + producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); + } + } + } + + private static Properties getConsumerProperties() { + return mkProperties( + mkMap( + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) + ) + ); + } + + private static String getKiloByteValue() { final StringBuilder kiloBuilder = new StringBuilder(1000); for (int i = 0; i < 1000; i++) { kiloBuilder.append('0'); @@ -258,7 +263,7 @@ private String getKiloByteValue() { return kiloBuilder.toString(); } - private void assertFalseNoRetry(final boolean assertion, final String message) { + private static void assertFalseNoRetry(final boolean assertion, final String message) { if (assertion) { throw new NoRetryException( new AssertionError(