From 759dbf96fb88d9c7281e60cfc05eb67170834b8e Mon Sep 17 00:00:00 2001 From: huxihx Date: Wed, 12 Feb 2020 12:26:03 +0800 Subject: [PATCH 1/3] https://issues.apache.org/jira/browse/KAFKA-9541 Occasionally the captured exception is DisconnectedException instead of TimeoutException. That might be due to an unexpected long pause that caused the node disconnection. We could simply ignore such cases. --- .../scala/unit/kafka/admin/DescribeConsumerGroupTest.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 560153f3a34aa..ebf3719e86fea 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -22,7 +22,7 @@ import joptsimple.OptionException import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.common.errors.{DisconnectException, TimeoutException} import org.junit.Assert._ import org.junit.Test @@ -582,6 +582,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { TestUtils.grabConsoleOutputAndError(service.describeGroups()) fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") } catch { + case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -602,6 +603,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupOffsets(group) fail("The consumer group command should fail due to low initialization timeout") } catch { + case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -622,11 +624,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupMembers(group, false) fail("The consumer group command should fail due to low initialization timeout") } catch { + case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) try { service.collectGroupMembers(group, true) fail("The consumer group command should fail due to low initialization timeout (verbose)") } catch { + case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -648,6 +652,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupState(group) fail("The consumer group command should fail due to low initialization timeout") } catch { + case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } From f97d4dc26f7b0ca6ef7794afc3fb535947598508 Mon Sep 17 00:00:00 2001 From: huxihx Date: Wed, 12 Feb 2020 12:29:12 +0800 Subject: [PATCH 2/3] use e.getCause --- .../unit/kafka/admin/DescribeConsumerGroupTest.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index ebf3719e86fea..75db776ebb0da 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -582,7 +582,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { TestUtils.grabConsoleOutputAndError(service.describeGroups()) fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") } catch { - case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -603,7 +603,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupOffsets(group) fail("The consumer group command should fail due to low initialization timeout") } catch { - case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -624,13 +624,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupMembers(group, false) fail("The consumer group command should fail due to low initialization timeout") } catch { - case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) try { service.collectGroupMembers(group, true) fail("The consumer group command should fail due to low initialization timeout (verbose)") } catch { - case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } @@ -652,7 +652,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { service.collectGroupState(group) fail("The consumer group command should fail due to low initialization timeout") } catch { - case e: ExecutionException if e.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) } } From 519b41d8b5f9f5ab8e7ba07bb914468439439071 Mon Sep 17 00:00:00 2001 From: huxihx Date: Wed, 12 Feb 2020 15:33:01 +0800 Subject: [PATCH 3/3] retryOnDisconnectedException --- .../admin/DescribeConsumerGroupTest.scala | 78 +++++++++++-------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 75db776ebb0da..946602450c28a 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -578,12 +578,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--timeout", "1", "--group", group) ++ describeType val service = getConsumerGroupService(cgcArgs) - try { - TestUtils.grabConsoleOutputAndError(service.describeGroups()) - fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + TestUtils.grabConsoleOutputAndError(service.describeGroups()) + fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -599,12 +600,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupOffsets(group) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupOffsets(group) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -620,19 +622,21 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupMembers(group, false) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) - try { - service.collectGroupMembers(group, true) - fail("The consumer group command should fail due to low initialization timeout (verbose)") - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) - } + retryOnDisconnectedException { + try { + service.collectGroupMembers(group, false) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupMembers(group, true) + fail("The consumer group command should fail due to low initialization timeout (verbose)") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } + } + } } } @@ -648,12 +652,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1") val service = getConsumerGroupService(cgcArgs) - try { - service.collectGroupState(group) - fail("The consumer group command should fail due to low initialization timeout") - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // Ignore occasional node disconnection - case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + retryOnDisconnectedException { + try { + service.collectGroupState(group) + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass) + } } } @@ -688,5 +693,16 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.") } + private def retryOnDisconnectedException[T](fn: => T): T = { + while (true) { + try { + return fn + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // retry + } + } + throw new IllegalStateException + } + }