Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers,
/**
* Creates and starts a Kafka cluster.
*/
public void start() throws IOException, InterruptedException {
public void start() throws IOException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would have written throws Exception from the beginning on, this change would not be necessary... (Just to back up my preferred coding stile to only use throws Exception in tests.)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol, makes sense.

log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance");
zookeeper = new EmbeddedZookeeper();
Expand All @@ -98,6 +98,7 @@ public void start() throws IOException, InterruptedException {
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix, other parts are just side cleanups.

putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);

for (int i = 0; i < brokers.length; i++) {
Expand Down Expand Up @@ -241,16 +242,6 @@ public void deleteTopicAndWait(final String topic) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
}

/**
* Deletes a topic and blocks until the topic got deleted.
*
* @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0})
* @param topic the name of the topic
*/
public void deleteTopicAndWait(final long timeoutMs, final String topic) throws InterruptedException {
deleteTopicsAndWait(timeoutMs, topic);
}

/**
* Deletes multiple topics returns immediately.
*
Expand Down Expand Up @@ -279,7 +270,7 @@ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) th
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException e) { }
} catch (final UnknownTopicOrPartitionException ignored) { }
}

if (timeoutMs > 0) {
Expand All @@ -298,24 +289,14 @@ public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedExcep
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException e) { }
} catch (final UnknownTopicOrPartitionException ignored) { }
}

if (timeoutMs > 0) {
TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
}
}

public void deleteAndRecreateTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
createTopics(topics);
}

public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws InterruptedException {
deleteTopicsAndWait(timeoutMs, topics);
createTopics(topics);
}

public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws InterruptedException {
TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds.");
}
Expand Down