From 9ac3425f657d352989eb0cd8994d9a36fc87fdea Mon Sep 17 00:00:00 2001 From: abbccdda Date: Mon, 20 Apr 2020 13:44:27 -0700 Subject: [PATCH] Reduce txn log partitions for embed broker --- .../utils/EmbeddedKafkaCluster.java | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index edabe24c7d2a2..804c1fcfb3a7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers, /** * Creates and starts a Kafka cluster. */ - public void start() throws IOException, InterruptedException { + public void start() throws IOException { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); zookeeper = new EmbeddedZookeeper(); @@ -98,6 +98,7 @@ public void start() throws IOException, InterruptedException { putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); for (int i = 0; i < brokers.length; i++) { @@ -241,16 +242,6 @@ public void deleteTopicAndWait(final String topic) throws InterruptedException { deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic); } - /** - * Deletes a topic and blocks until the topic got deleted. - * - * @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0}) - * @param topic the name of the topic - */ - public void deleteTopicAndWait(final long timeoutMs, final String topic) throws InterruptedException { - deleteTopicsAndWait(timeoutMs, topic); - } - /** * Deletes multiple topics returns immediately. * @@ -279,7 +270,7 @@ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) th for (final String topic : topics) { try { brokers[0].deleteTopic(topic); - } catch (final UnknownTopicOrPartitionException e) { } + } catch (final UnknownTopicOrPartitionException ignored) { } } if (timeoutMs > 0) { @@ -298,7 +289,7 @@ public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedExcep for (final String topic : topics) { try { brokers[0].deleteTopic(topic); - } catch (final UnknownTopicOrPartitionException e) { } + } catch (final UnknownTopicOrPartitionException ignored) { } } if (timeoutMs > 0) { @@ -306,16 +297,6 @@ public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedExcep } } - public void deleteAndRecreateTopics(final String... topics) throws InterruptedException { - deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics); - createTopics(topics); - } - - public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws InterruptedException { - deleteTopicsAndWait(timeoutMs, topics); - createTopics(topics); - } - public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws InterruptedException { TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds."); }