Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O
final String testId = safeUniqueTestName(getClass(), testName);
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
final Set<TopicPartition> inputTopicPartitions = mkSet(
new TopicPartition(inputTopic, 0),
new TopicPartition(inputTopic, 1)
);

final String storeName = "store" + testId;
final String storeChangelog = appId + "-store" + testId + "-changelog";
final Set<TopicPartition> changelogTopicPartitions = mkSet(
Expand Down Expand Up @@ -124,46 +129,27 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O
builder.table(inputTopic, materializedFunction.apply(storeName));
final Topology topology = builder.build();

final Properties producerProperties = mkProperties(
mkMap(
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
)
);

final StringBuilder kiloBuilder = new StringBuilder(1000);
for (int i = 0; i < 1000; i++) {
kiloBuilder.append('0');
}
final String kilo = kiloBuilder.toString();

try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
}
}

final Properties consumerProperties = mkProperties(
mkMap(
mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
)
);
final int numberOfRecords = 500;

produceTestData(inputTopic, numberOfRecords);

try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
kafkaStreams0.start();

// sanity check: just make sure we actually wrote all the input records
TestUtils.waitForCondition(
() -> getEndOffsetSum(inputTopicPartitions, consumer) == numberOfRecords,
120_000L,
() -> "Input records haven't all been written to the input topic: " + getEndOffsetSum(inputTopicPartitions, consumer)
);

// wait until all the input records are in the changelog
TestUtils.waitForCondition(
() -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000,
() -> getEndOffsetSum(changelogTopicPartitions, consumer) == numberOfRecords,
120_000L,
() -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer)
() -> "Input records haven't all been written to the changelog: " + getEndOffsetSum(changelogTopicPartitions, consumer)
);

final AtomicLong instance1TotalRestored = new AtomicLong(-1);
Expand Down Expand Up @@ -240,7 +226,44 @@ public void onRestoreEnd(final TopicPartition topicPartition,
}
}

private void assertFalseNoRetry(final boolean assertion, final String message) {
private void produceTestData(final String inputTopic, final int numberOfRecords) {
final String kilo = getKiloByteValue();

final Properties producerProperties = mkProperties(
mkMap(
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
)
);

try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
for (int i = 0; i < numberOfRecords; i++) {
producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
}
}
}

private static Properties getConsumerProperties() {
return mkProperties(
mkMap(
mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
)
);
}

private static String getKiloByteValue() {
final StringBuilder kiloBuilder = new StringBuilder(1000);
for (int i = 0; i < 1000; i++) {
kiloBuilder.append('0');
}
return kiloBuilder.toString();
}

private static void assertFalseNoRetry(final boolean assertion, final String message) {
if (assertion) {
throw new NoRetryException(
new AssertionError(
Expand Down Expand Up @@ -268,8 +291,8 @@ private static Properties streamsProperties(final String appId,
);
}

private static long getChangelogOffsetSum(final Set<TopicPartition> changelogTopicPartitions,
final Consumer<String, String> consumer) {
private static long getEndOffsetSum(final Set<TopicPartition> changelogTopicPartitions,
final Consumer<String, String> consumer) {
long sum = 0;
final Collection<Long> values = consumer.endOffsets(changelogTopicPartitions).values();
for (final Long value : values) {
Expand Down