diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 725c8058e5878..7e7cf2996a7d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -95,6 +95,11 @@ private void shouldScaleOutWithWarmupTasks(final Function inputTopicPartitions = mkSet( + new TopicPartition(inputTopic, 0), + new TopicPartition(inputTopic, 1) + ); + final String storeName = "store" + testId; final String storeChangelog = appId + "-store" + testId + "-changelog"; final Set changelogTopicPartitions = mkSet( @@ -124,46 +129,27 @@ private void shouldScaleOutWithWarmupTasks(final Function producer = new KafkaProducer<>(producerProperties)) { - for (int i = 0; i < 1000; i++) { - producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); - } - } - - final Properties consumerProperties = mkProperties( - mkMap( - mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), - mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) - ) - ); + final int numberOfRecords = 500; + produceTestData(inputTopic, numberOfRecords); try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); - final Consumer consumer = new KafkaConsumer<>(consumerProperties)) { + final Consumer consumer = new KafkaConsumer<>(getConsumerProperties())) { kafkaStreams0.start(); + // sanity check: just make sure we actually wrote all the input records + TestUtils.waitForCondition( + () -> getEndOffsetSum(inputTopicPartitions, consumer) == numberOfRecords, + 120_000L, + () -> "Input records haven't all been written to the input topic: " + getEndOffsetSum(inputTopicPartitions, consumer) + ); + // wait until all the input records are in the changelog TestUtils.waitForCondition( - () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000, + () -> getEndOffsetSum(changelogTopicPartitions, consumer) == numberOfRecords, 120_000L, - () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer) + () -> "Input records haven't all been written to the changelog: " + getEndOffsetSum(changelogTopicPartitions, consumer) ); final AtomicLong instance1TotalRestored = new AtomicLong(-1); @@ -240,7 +226,44 @@ public void onRestoreEnd(final TopicPartition topicPartition, } } - private void assertFalseNoRetry(final boolean assertion, final String message) { + private void produceTestData(final String inputTopic, final int numberOfRecords) { + final String kilo = getKiloByteValue(); + + final Properties producerProperties = mkProperties( + mkMap( + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ProducerConfig.ACKS_CONFIG, "all"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + ) + ); + + try (final Producer producer = new KafkaProducer<>(producerProperties)) { + for (int i = 0; i < numberOfRecords; i++) { + producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); + } + } + } + + private static Properties getConsumerProperties() { + return mkProperties( + mkMap( + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) + ) + ); + } + + private static String getKiloByteValue() { + final StringBuilder kiloBuilder = new StringBuilder(1000); + for (int i = 0; i < 1000; i++) { + kiloBuilder.append('0'); + } + return kiloBuilder.toString(); + } + + private static void assertFalseNoRetry(final boolean assertion, final String message) { if (assertion) { throw new NoRetryException( new AssertionError( @@ -268,8 +291,8 @@ private static Properties streamsProperties(final String appId, ); } - private static long getChangelogOffsetSum(final Set changelogTopicPartitions, - final Consumer consumer) { + private static long getEndOffsetSum(final Set changelogTopicPartitions, + final Consumer consumer) { long sum = 0; final Collection values = consumer.endOffsets(changelogTopicPartitions).values(); for (final Long value : values) {