Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import joptsimple.OptionException
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.{DisconnectException, TimeoutException}
import org.junit.Assert._
import org.junit.Test

Expand Down Expand Up @@ -578,11 +578,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--timeout", "1", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)

try {
TestUtils.grabConsoleOutputAndError(service.describeGroups())
fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
retryOnDisconnectedException {
try {
TestUtils.grabConsoleOutputAndError(service.describeGroups())
fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
}
}

Expand All @@ -598,11 +600,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)

try {
service.collectGroupOffsets(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
retryOnDisconnectedException {
try {
service.collectGroupOffsets(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
}
}

Expand All @@ -618,17 +622,21 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)

try {
service.collectGroupMembers(group, false)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
try {
service.collectGroupMembers(group, true)
fail("The consumer group command should fail due to low initialization timeout (verbose)")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
retryOnDisconnectedException {
try {
service.collectGroupMembers(group, false)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
retryOnDisconnectedException {
try {
service.collectGroupMembers(group, true)
fail("The consumer group command should fail due to low initialization timeout (verbose)")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
}
}
}
}

Expand All @@ -644,11 +652,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)

try {
service.collectGroupState(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
retryOnDisconnectedException {
try {
service.collectGroupState(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
}
}

Expand Down Expand Up @@ -683,5 +693,16 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.")
}

private def retryOnDisconnectedException[T](fn: => T): T = {
while (true) {
try {
return fn
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[DisconnectException] => // retry
}
}
throw new IllegalStateException
}

}