From 852b11b93510b77c10b8575aea3de8e91fea192f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:04:54 -0800 Subject: [PATCH 01/10] Add missing string interpolation --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 75027f9bb95df..23225e8bf2aa3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1402,7 +1402,7 @@ class KafkaApis(val requestChannel: RequestChannel, setTopics(results) val responseBody = new CreateTopicsResponse(responseData) trace(s"Sending create topics response $responseData for correlation id " + - "${request.header.correlationId} to client ${request.header.clientId}.") + s"${request.header.correlationId} to client ${request.header.clientId}.") responseBody } sendResponseMaybeThrottle(request, createResponse) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index c58532dfbc911..615b21bc36508 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -558,7 +558,7 @@ class PartitionTest { case Right(Some(offsetAndTimestamp)) => fail("Should have failed") case Right(None) => fail("Should have failed") case Left(e: OffsetNotAvailableException) => // ok - case Left(e: ApiException) => fail("Should have seen OffsetNotAvailableException, saw $e") + case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e") } From bb555d7ddc1198cf970230f9183483504b77849b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:05:45 -0800 Subject: [PATCH 02/10] Remove unused code --- core/src/main/scala/kafka/admin/TopicCommand.scala | 3 ++- .../main/scala/kafka/server/DelayedElectPreferredLeader.scala | 4 ---- .../test/scala/integration/kafka/api/ConsumerBounceTest.scala | 2 +- .../admin/PreferredReplicaLeaderElectionCommandTest.scala | 1 - core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala | 3 +-- .../unit/kafka/coordinator/group/GroupMetadataTest.scala | 1 - .../unit/kafka/server/AbstractCreateTopicsRequestTest.scala | 1 - .../scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +--- 8 files changed, 5 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 2fcbe176636e1..a4fa20f324b66 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -559,7 +559,8 @@ object TopicCommand extends Logging { private val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment") - private val forceOpt = parser.accepts("force", "Suppress console prompts") + // This is not currently used, but we keep it for compatibility + parser.accepts("force", "Suppress console prompts") private val excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default") diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala index 38b07ad5ce67a..f3543a89518ea 100644 --- a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala +++ b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala @@ -55,10 +55,6 @@ class DelayedElectPreferredLeader(delayMs: Long, responseCallback(timedout ++ fullResults) } - private def timeoutWaiting = { - waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap - } - /** * Try to complete the delayed operation by first checking if the operation * can be completed by now. If yes execute the completion logic by calling diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index a382154c2fef0..e535104306fa4 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -418,7 +418,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { /** * Creates N consumers with the same group ID and ensures the group rebalances properly at each step */ - private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String = topic): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = { + private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = { val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]() for (_ <- 1.to(consumerCount)) { val newConsumer = createConsumerWithGroupId(groupId) diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala index 824e8fb4253ec..6481f3f0a9bcf 100644 --- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util.Properties -import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.common.{AdminCommandFailedException, TopicAndPartition} import kafka.network.RequestChannel import kafka.security.auth._ diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index eeb9101756a83..008fd662b79d2 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -172,7 +172,6 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT val brokers = List(0) TestUtils.createBrokersInZk(zkClient, brokers) - val topic = "testTopic" topicService.createTopic(new TopicCommandOptions( Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName))) @@ -570,4 +569,4 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT assertFalse(TestUtils.grabConsoleOutput(topicService.deleteTopic(escapedCommandOpts)).contains(topic2)) assertTrue(TestUtils.grabConsoleOutput(topicService.deleteTopic(unescapedCommandOpts)).contains(topic2)) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 9281ea9f5a915..3108b150f16ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -17,7 +17,6 @@ package kafka.coordinator.group -import kafka.admin.ConsumerGroupCommand.GroupState import kafka.common.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index bef894a2460c0..ec520f9bc520c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -29,7 +29,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue} -import scala.collection.JavaConverters import scala.collection.JavaConverters._ class AbstractCreateTopicsRequestTest extends BaseRequestTest { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 04e021840188e..e6df8e842b4c7 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -24,7 +24,6 @@ import kafka.cluster.Partition import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils -import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.utils.SystemTime import org.easymock.EasyMock._ -import org.easymock.{Capture, CaptureType, IAnswer} +import org.easymock.{Capture, CaptureType} import org.junit.Assert._ import org.junit.Test @@ -46,7 +45,6 @@ class ReplicaFetcherThreadTest { private val t1p1 = new TopicPartition("topic1", 1) private val t2p1 = new TopicPartition("topic2", 1) - private var toFail = false private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000) private def offsetAndEpoch(fetchOffset: Long, leaderEpoch: Int = 1): OffsetAndEpoch = { From 3ef916b79c61db3060125a27d90a19f62d248c0b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:06:07 -0800 Subject: [PATCH 03/10] Replace deprecated usage of assertThat --- core/src/test/scala/kafka/tools/CustomDeserializerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala index 37b5b79a868ec..f94a9006c8cb5 100644 --- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala +++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala @@ -22,8 +22,8 @@ import java.io.PrintStream import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Deserializer import org.hamcrest.CoreMatchers +import org.hamcrest.MatcherAssert._ import org.junit.Test -import org.junit.Assert.assertThat import org.scalatest.mockito.MockitoSugar class CustomDeserializer extends Deserializer[String] { @@ -31,7 +31,7 @@ class CustomDeserializer extends Deserializer[String] { } override def deserialize(topic: String, data: Array[Byte]): String = { - assertThat("topic must not be null", topic, CoreMatchers.notNullValue()) + assertThat("topic must not be null", topic, CoreMatchers.notNullValue) new String(data) } From 875a0e9e12ec221db691a212b70628871844b273 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:07:09 -0800 Subject: [PATCH 04/10] var -> val and add missing case --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 09b2e335de1b7..6d8d50443e9cd 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1751,7 +1751,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo private def reCreate(): Stat = { val codeAfterDelete = delete() - var codeAfterReCreate = codeAfterDelete + val codeAfterReCreate = codeAfterDelete debug(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete") if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE) { create() @@ -1880,6 +1880,7 @@ object KafkaZkClient { case _ => null } SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata) + case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp") } case null => throw KeeperException.create(resultCode) case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest") From f0a0cab0e7f9b657d94e65768bb2202bc64ab60b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:07:30 -0800 Subject: [PATCH 05/10] Fix ETA warning --- .../unit/kafka/admin/TopicCommandWithAdminClientTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 238afa3b82b0b..e0597de748751 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -71,7 +71,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin } def assertCheckArgsExitCode(expected: Int, options: TopicCommandOptions) { - assertExitCode(expected, options.checkArgs) + assertExitCode(expected, options.checkArgs _) } def createAndWaitTopic(opts: TopicCommandOptions): Unit = { @@ -635,4 +635,4 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin assertTrue(output.contains(testTopicName)) assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)) } -} \ No newline at end of file +} From 949458241534da7b6a77bfaf609441f938e262e3 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:07:42 -0800 Subject: [PATCH 06/10] Simplify code --- .../scala/kafka/coordinator/group/GroupMetadata.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 3bc0117850ead..c02a0205f4136 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -232,13 +232,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState numMembersAwaitingJoin -= 1 } - if (isLeader(memberId)) { - leaderId = if (members.isEmpty) { - None - } else { - Some(members.keys.head) - } - } + if (isLeader(memberId)) + leaderId = members.keys.headOption } def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) && !has(memberId) From d34e6a49381bb95fa6c7312d2ab8bff9b478981a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 12 Feb 2019 07:13:15 -0800 Subject: [PATCH 07/10] Fix `changePreferredLeader` test --- .../kafka/api/AdminClientIntegrationTest.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 5a3278cda62db..d0a6488819536 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1338,16 +1338,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { changePreferredLeader(prefer1) // but shut it down... servers(1).shutdown() - waitUntilTrue ( - () => { - val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get() - return !description.asScala.flatMap{ - case (topic, description) => description.partitions().asScala.map( - partition => partition.isr().asScala).flatten - }.exists(node => node.id == 1) - }, - "Expect broker 1 to no longer be in any ISR" - ) + waitUntilTrue (() => { + val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get() + !description.asScala.flatMap { case (topic, description) => + description.partitions.asScala.flatMap(_.isr.asScala) + }.exists(_.id == 1) + }, "Expect broker 1 to no longer be in any ISR") // ... now what happens if we try to elect the preferred leader and it's down? val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) From dd5a468bd9ee6ae47df6c69a75c0c3c32898f9d1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Feb 2019 19:42:00 -0800 Subject: [PATCH 08/10] Fix and simplify `testElectPreferredLeaders` --- .../api/AdminClientIntegrationTest.scala | 54 ++++++------------- 1 file changed, 16 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index d0a6488819536..46cb2b822343f 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1349,48 +1349,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) electResult = client.electPreferredLeaders(asList(partition1), shortTimeout) assertEquals(Set(partition1).asJava, electResult.partitions.get) - try { - electResult.partitionResult(partition1).get() - fail() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) - assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) - } + var e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause + assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) + assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) assertEquals(2, currentLeader(partition1)) // preferred leader unavailable with null argument electResult = client.electPreferredLeaders(null, shortTimeout) - try { - electResult.partitions.get() - fail() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) - } - try { - electResult.partitionResult(partition1).get() - fail() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) - assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) - } - try { - electResult.partitionResult(partition2).get() - fail() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) - assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) - } + e = intercept[ExecutionException](electResult.partitions.get()).getCause + assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) + + e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause + assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) + assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + + e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause + assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) + assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) assertEquals(2, currentLeader(partition1)) assertEquals(2, currentLeader(partition2)) From f2b655d3dcc8542e9bde08fca3fc24d136ff626e Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Feb 2019 19:42:15 -0800 Subject: [PATCH 09/10] Remove commented out code --- .../admin/PreferredReplicaLeaderElectionCommandTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala index 6481f3f0a9bcf..96e7dace4e7b2 100644 --- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala @@ -33,8 +33,7 @@ import org.apache.kafka.common.network.ListenerName import org.junit.Assert._ import org.junit.{After, Test} -class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging /*with RackAwareTest*/ { - +class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging { var servers: Seq[KafkaServer] = Seq() @After From b0d70daf55fabceeb379062c4641749466d771c5 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Feb 2019 19:59:47 -0800 Subject: [PATCH 10/10] More test simplification --- .../api/AdminClientIntegrationTest.scala | 51 ++++++------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 46cb2b822343f..96de860338884 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1274,17 +1274,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { waitForLeaderToBecome(partition1, 1) // topic 2 unchanged - try { - electResult.partitionResult(partition2).get() - fail("topic 2 wasn't requested") - } catch { - case e: ExecutionException => - val cause = e.getCause - assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException]) - assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted", - cause.getMessage) - assertEquals(0, currentLeader(partition2)) - } + var e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause + assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) + assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted", + e.getMessage) + assertEquals(0, currentLeader(partition2)) // meaningful election with null partitions electResult = client.electPreferredLeaders(null) @@ -1298,17 +1292,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val unknownPartition = new TopicPartition("topic-does-not-exist", 0) electResult = client.electPreferredLeaders(asList(unknownPartition)) assertEquals(Set(unknownPartition).asJava, electResult.partitions.get) - try { - electResult.partitionResult(unknownPartition).get() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) - assertEquals("The partition does not exist.", - cause.getMessage) - assertEquals(1, currentLeader(partition1)) - assertEquals(1, currentLeader(partition2)) - } + e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause + assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) + assertEquals("The partition does not exist.", e.getMessage) + assertEquals(1, currentLeader(partition1)) + assertEquals(1, currentLeader(partition2)) // Now change the preferred leader to 2 changePreferredLeader(prefer2) @@ -1318,15 +1306,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get) waitForLeaderToBecome(partition1, 2) assertEquals(1, currentLeader(partition2)) - try { - electResult.partitionResult(unknownPartition).get() - } catch { - case e: Exception => - val cause = e.getCause - assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) - assertEquals("The partition does not exist.", - cause.getMessage) - } + e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause + assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) + assertEquals("The partition does not exist.", e.getMessage) // dupe partitions electResult = client.electPreferredLeaders(asList(partition2, partition2)) @@ -1340,16 +1322,15 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { servers(1).shutdown() waitUntilTrue (() => { val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get() - !description.asScala.flatMap { case (topic, description) => - description.partitions.asScala.flatMap(_.isr.asScala) - }.exists(_.id == 1) + val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) + !isr.exists(_.id == 1) }, "Expect broker 1 to no longer be in any ISR") // ... now what happens if we try to elect the preferred leader and it's down? val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) electResult = client.electPreferredLeaders(asList(partition1), shortTimeout) assertEquals(Set(partition1).asJava, electResult.partitions.get) - var e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause + e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))