Skip to content

KAFKA-7893 Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest#6238

Merged
hachikuji merged 9 commits intoapache:trunkfrom
stanislavkozlovski:KAFKA-7893-refactor-consumer-bounce-test
Apr 5, 2019
Merged

KAFKA-7893 Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest#6238
hachikuji merged 9 commits intoapache:trunkfrom
stanislavkozlovski:KAFKA-7893-refactor-consumer-bounce-test

Conversation

@stanislavkozlovski
Copy link
Copy Markdown
Contributor

@stanislavkozlovski stanislavkozlovski commented Feb 7, 2019

This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get 18/25 passes.

It does so by reusing the less-flaky consumer integration testing functionality inside BaseConsumerTest. Most notably, the test now makes use of the ConsumerAssignmentPoller class - each consumer now polls non-stop rather than the more batch-oriented polling we had in ConsumerBounceTest#waitForRebalance().

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup! Left a few comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this should be a requirement. I think validateGroupAssignment can still be satisfied as long as the subscription is non-empty.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isPartitionAssignmentValid(assignments, subscriptions)

* Check whether partition assignment is valid
   * Assumes partition assignment is valid if
   * 1. Every consumer got assigned at least one partition
   * 2. Each partition is assigned to only one consumer
   * 3. Every partition is assigned to one of the consumers

It checks

val allNonEmptyAssignments = assignments.forall(assignment => assignment.nonEmpty)
    if (!allNonEmptyAssignments) {
      // at least one consumer got empty assignment
      return false
    }

I don't think it will pass if that assertion doesn't itself

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should take either topicsToSubscribe or partitionsToAssign, but not both. Maybe we can have two separate constructors. Also, subscribe below should probably throw an illegal state error if partitionsToAssign is not empty.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added two new secondary constructors - can you check?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending BaseConsumerTest means we inherit its test cases (i.e. testSimpleConsumption and testCoordinatorFailover). I wonder if we should move these tests somewhere else.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 6 test classes that make use of those test cases. Only ConsumerBounceTest is redundant. I don't think it hurts that much.

We could create a new class BasicConsumerTests which defines those tests if you think that's a good approach

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not that big of a deal. Just it's a bit annoying to extend the build time even more because of tests we know are redundant. I think I slightly prefer your suggestion for BasicConsumerTests. Or maybe we can call the new class BaseConsumerTest and rename the existing one to AbstractConsumerTest?

The use of inheritance in these test cases has been a bit problematic in general. It would probably be better to split out any shared utility logic into a separate utility class. We don't have to do that here though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your thinking. Let's leave this as it is for the time being so we can merge this PR and reduce the flakiness of ConsumerBounceTest.

I've opened up a follow-up JIRA to track this - https://issues.apache.org/jira/browse/KAFKA-8176

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably get rid of this variable and just use kickedOutConsumerIdx.foreach.

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

@hachikuji apologies for squashing all the commits into one - I had to rebase and did not want to deal fixing merge conflicts with each commit. I shouldn't have done that as it now makes your review not as straightforward

Inheritance is ConsumerBounceTest->BaseConsumerTest->BaseRequestTest->IntegrationTestHarness

Also refactored testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup to use consumer pollers from BaseConsumerTest

Refactor more tests to use ConsumerPollers

testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize and testClose specifically

Refactor rest of ConsumerBounceTest tests

Remove unused imports

Make the code a bit cleaner

Remove unused KafkaConfig import

Address PR comments
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

1 similar comment
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

(I am also running the tests locally)

Adds a new protected method modifyConfigs, mainly to allow the inheriting BaseRequestTest to change the way it handles configs in a more succinct way and SaslPlaintextConsumerTest to override its wanted listeners

Also rename BaseRequestTest#numBrokers to serverCount. This consolidates the use of the same method in IntegrationTestHarness
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

The SaslPlainPlaintextConsumerTest class had problems due to not initializing its config listeners properly. This is because the BaseRequestTest overrides generateConfigs.
I generalized the code a bit more to allow all classes to modify exactly what they want

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

1 similar comment
@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 1, 2019

@stanislavkozlovski you should use merge instead of rebase.

override def serverCount: Int = 3

// If required, override properties by mutating the passed Properties object
protected def propertyOverrides(properties: Properties) {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name doesn't quite work generally since IntegrationTestHarness exposes brokers and clients. So it should probably be renamed to make it clear that it's only for broker properties.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great catch, thanks

@ijuma ijuma added the tests Test fixes (including flaky tests) label Mar 4, 2019
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

1 similar comment
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Mar 11, 2019

Testing this locally, I see the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup gets stuck at the very last sendRecords() call.
Each producer seems to throw:

org.apache.kafka.common.errors.TimeoutException: Topic group-max-size-test not present in metadata after 60000 ms.

The issue was that we would not call restartDeadBrokers() after we had found a kicked-out consumer and were essentially running a reduced cluster.

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Mar 16, 2019

Strange, I could have sworn this built and passes checkstyle/findbugs locally. Sorry for the noise. I will rebase with master as well now as there seem to be conflicting files

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Left a few additional comments.

class AuthorizerIntegrationTest extends BaseRequestTest {

override def numBrokers: Int = 1
override def serverCount: Int = 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we're a little inconsistent with naming (broker vs server). Perhaps this should be brokerCount to go along with brokerPropertyOverrides?

val group = "my-test"
val producerClientId = "ConsumerTestProducer"
val consumerClientId = "ConsumerTestConsumer"
val GroupMaxSessionTimeoutMs = 30000L
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is only this one capitalized?



protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little surprising to find that we mutate consumerConfig here. So if you called createConsumerWithGroupId first and then createConsumer, the first call would affect the second call. Maybe instead we could duplicate consumerConfig and override group id?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll make use of the configOverrides var in createConsumer()

* @param consumerPollers current consumer pollers
* @param topicsToSubscribe topics to which new consumers will subscribe to
* @param subscriptions set of all topic partitions
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: misaligned

/**
* Check whether partition assignment is valid
* Assumes partition assignment is valid iff
* Assumes partition assignment is valid if
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original was probably intentional, if pedantic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not that big of a deal. Just it's a bit annoying to extend the build time even more because of tests we know are redundant. I think I slightly prefer your suggestion for BasicConsumerTests. Or maybe we can call the new class BaseConsumerTest and rename the existing one to AbstractConsumerTest?

The use of inheritance in these test cases has been a bit problematic in general. It would probably be better to split out any shared utility logic into a separate utility class. We don't have to do that here though.

rename serverCount => brokerCount
fix var naming capitalization
fix javadoc indentation and typo
@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging to trunk.

@hachikuji hachikuji merged commit cc4fde3 into apache:trunk Apr 5, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* apache/trunk:
  MINOR: Add security considerations for remote JMX in Kafka docs (apache#6544)
  MINOR: Remove redundant access specifiers from metrics interfaces (apache#6527)
  MINOR: Correct KStream documentation (apache#6552)
  KAFKA-8013; Avoid underflow when reading a Struct from a partially correct buffer (apache#6340)
  KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (apache#6384)
  MINOR: Move common consumer tests out of abstract consumer class (apache#6548)
  KAFKA-8168; Add a generated ApiMessageType class
  KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (apache#6238)
  MINOR: Tighten up metadata upgrade test (apache#6531)
  KAFKA-8190; Don't update keystore modification time during validation (apache#6539)
  MINOR: Fixed a few warning in core and connects (apache#6545)
  KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
  MINOR: fix throttling and status in ConnectionStressWorker
  KAFKA-8090: Use automatic RPC generation in ControlledShutdown
  KAFKA-6399: Remove Streams max.poll.interval override (apache#6509)
  KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (apache#6475)
  HOTFIX: Update unit test for KIP-443
  KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (apache#6511)
  KAFKA-8183: Add retries to WorkerUtils#verifyTopics (apache#6532)
  KAFKA-8181: Removed Avro topic from TOC on kafka (apache#6529)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…aseConsumerTest (apache#6238)

This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get `18/25` passes.

It does so by reusing the less-flaky consumer integration testing functionality inside `BaseConsumerTest`. Most notably, the test now makes use of the `ConsumerAssignmentPoller` class  - each consumer now polls non-stop rather than the more batch-oriented polling we had in `ConsumerBounceTest#waitForRebalance()`.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants