diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 560153f3a34aa..946602450c28a 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -22,7 +22,7 @@ import joptsimple.OptionException import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.common.errors.{DisconnectException, TimeoutException} import org.junit.Assert._ import org.junit.Test @@ -578,11 +578,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--timeout", "1", "--group", group) ++ describeType val service = getConsumerGroupService(cgcArgs) - try { - TestUtils.grabConsoleOutputAndError(service.describeGroups()) - fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") - } catch { - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + TestUtils.grabConsoleOutputAndError(service.describeGroups()) + fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -598,11 +600,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupOffsets(group) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupOffsets(group) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -618,17 +622,21 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupMembers(group, false) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) - try { - service.collectGroupMembers(group, true) - fail("The consumer group command should fail due to low initialization timeout (verbose)") - } catch { - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) - } + retryOnDisconnectedException { + try { + service.collectGroupMembers(group, false) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupMembers(group, true) + fail("The consumer group command should fail due to low initialization timeout (verbose)") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } + } + } } } @@ -644,11 +652,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupState(group) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupState(group) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -683,5 +693,16 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.") } + private def retryOnDisconnectedException[T](fn: => T): T = { + while (true) { + try { + return fn + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // retry + } + } + throw new IllegalStateException + } + }