Skip to content

KAFKA-9261; Client should handle unavailable leader metadata#7770

Merged
hachikuji merged 12 commits intoapache:trunkfrom
hachikuji:KAFKA-9261
Feb 5, 2020
Merged

KAFKA-9261; Client should handle unavailable leader metadata#7770
hachikuji merged 12 commits intoapache:trunkfrom
hachikuji:KAFKA-9261

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji commented Dec 3, 2019

The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the request-level guarantee of leader availability. There is no mechanism similar to the leader epoch to track the staleness of broker metadata, so we still overwrite all of the broker metadata from each response, which means that the partition metadata can get out of sync with the broker metadata in the client's cache. Hence it is no longer safe to validate inside the Cluster constructor that each leader has an associated Node

Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the Node object at the partition level. In order to keep the state consistent, each Node reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single collection and cached partition metadata only references the id of the broker. To accommodate this, we have removed PartitionInfoAndEpoch and we have altered MetadataResponse.PartitionMetadata to eliminate its Node references.

Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in MetadataCache.getEndpoints (which was renamed to maybeFilterAliveReplicas). The only reason this was expensive was because we had to build a new collection for the Node representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower level id lists and no copy of the replicas is needed (at least for all versions other than 0).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

Note given the size/risk of this patch, I am planning to submit a smaller fix for 2.4.

ijuma pushed a commit that referenced this pull request Dec 4, 2019
This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
partition leader state is kept in sync with broker metadata in MetadataCache and
consequently in Cluster. Due to the possibility of metadata event reordering, it was
possible for this state to be inconsistent which could lead to an NPE in some cases. The
test case here provides a specific scenario where this could happen.

Also see #7770 for additional detail.

Reviewers: Ismael Juma <ismael@juma.me.uk>
ijuma pushed a commit that referenced this pull request Dec 4, 2019
This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
partition leader state is kept in sync with broker metadata in MetadataCache and
consequently in Cluster. Due to the possibility of metadata event reordering, it was
possible for this state to be inconsistent which could lead to an NPE in some cases. The
test case here provides a specific scenario where this could happen.

Also see #7770 for additional detail.

Reviewers: Ismael Juma <ismael@juma.me.uk>
@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@dengziming
Copy link
Copy Markdown
Member

hi, @hachikuji
I'm still confused about that we can no longer rely on the request-level guarantee of leader availability, I checked the code, If stale partition metadata is detected, the client ignores it and uses the last known metadata, but the last known metadata is also from last MetadataResponse so it should also contain leader info. Why it is normal before KIP-320 but unnormal when using mix of multiple metadata responses?

efeg pushed a commit to efeg/kafka that referenced this pull request Dec 12, 2019
…data (apache#7772)

TICKET = 
LI_DESCRIPTION = 

This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
partition leader state is kept in sync with broker metadata in MetadataCache and
consequently in Cluster. Due to the possibility of metadata event reordering, it was
possible for this state to be inconsistent which could lead to an NPE in some cases. The
test case here provides a specific scenario where this could happen.

Also see apache#7770 for additional detail.

Reviewers: Ismael Juma <ismael@juma.me.uk>
EXIT_CRITERIA = MANUAL [""]
xiowu0 pushed a commit to linkedin/kafka that referenced this pull request Dec 12, 2019
…9261. (#63)

TICKET =[KAFKA-9212, KAFKA-9261]
LI_DESCRIPTION =

1. Rollback hotfix for "Rollback KAFKA-7440 as a workaround for KAFKA-9212"

2. KAFKA-9261; Client should handle inconsistent leader metadata (apache#7772)

This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
partition leader state is kept in sync with broker metadata in MetadataCache and
consequently in Cluster. Due to the possibility of metadata event reordering, it was
possible for this state to be inconsistent which could lead to an NPE in some cases. The
test case here provides a specific scenario where this could happen.

Also see apache#7770 for additional detail.

Reviewers: Ismael Juma <ismael@juma.me.uk>

3. KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment

KIP-320 improved fetch semantics by adding leader epoch validation. This relies on
reliable propagation of leader epoch information from the controller. Unfortunately, we
have encountered a bug during partition reassignment in which the leader epoch in the
controller context does not get properly updated. This causes UpdateMetadata requests
to be sent with stale epoch information which results in the metadata caches on the
brokers falling out of sync.

This bug has existed for a long time, but it is only a problem due to the new epoch
validation done by the client. Because the client includes the stale leader epoch in its
requests, the leader rejects them, yet the stale metadata cache on the brokers prevents
the consumer from getting the latest epoch. Hence the consumer cannot make progress
while a reassignment is ongoing.

Although it is straightforward to fix this problem in the controller for the new releases
(which this patch does), it is not so easy to fix older brokers which means new clients
could still encounter brokers with this bug. To address this problem, this patch also
modifies the client to treat the leader epoch returned from the Metadata response as
"unreliable" if it comes from an older version of the protocol. The client in this case will
discard the returned epoch and it won't be included in any requests.

Also, note that the correct epoch is still forwarded to replicas correctly in the
LeaderAndIsr request, so this bug does not affect replication.

Reviewers: Jun Rao <junrao@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
@hachikuji
Copy link
Copy Markdown
Contributor Author

@dengziming The point I was making is that the cache on the client is no longer guaranteed to contain only metadata from a single response. Because of this, the request-level guarantee on the availability of leader connection information cannot be assumed to hold for the cache in general.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jan 23, 2020

retest this please

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Very happy to see the efficiency improvement in the MetadataCache. I took an initial pass at about half of the files. Will continue later today, but thought I'd share the comments in the meantime.

Comment thread core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread clients/src/test/java/org/apache/kafka/test/TestUtils.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java Outdated
Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the PR, changes look good. I had a couple of questions on the test changes, but Ismael has already asked those. I wasn't entirely sure of the full impact of storing TopicPartition and integer ids instead of partition id and Node. I guess the overall effect is an improvement in performance as you mentioned in the description since we avoid creating unnecessary Nodes. The changes do make the code neater, so LGTM.

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarifications. I went through all of the code and left a few comments. Most of them are minor, the main question is if we have all the changes (including tests) from #7772 (which we only merged to older branches).

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/Cluster.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java Outdated
Comment thread core/src/main/scala/kafka/server/MetadataCache.scala Outdated
Comment thread core/src/main/scala/kafka/server/MetadataCache.scala Outdated
Comment thread core/src/main/scala/kafka/server/MetadataCache.scala Outdated
Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, looking good. Just a few minor comments below that should hopefully be easy to address or reject.

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread core/src/main/scala/kafka/server/MetadataCache.scala
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, LGTM.

@hachikuji hachikuji merged commit ae0c6e5 into apache:trunk Feb 5, 2020
hachikuji added a commit that referenced this pull request Feb 5, 2020
The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the request-level guarantee of leader availability. There is no mechanism similar to the leader epoch to track the staleness of broker metadata, so we still overwrite all of the broker metadata from each response, which means that the partition metadata can get out of sync with the broker metadata in the client's cache. Hence it is no longer safe to validate inside the `Cluster` constructor that each leader has an associated `Node`

Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the `Node` object at the partition level. In order to keep the state consistent, each `Node` reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single collection and cached partition metadata only references the id of the broker. To accommodate this, we have removed `PartitionInfoAndEpoch` and we have altered `MetadataResponse.PartitionMetadata` to eliminate its `Node` references.

Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in `MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). The only reason this was expensive was because we had to build a new collection for the `Node` representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower level id lists and no copy of the replicas is needed (at least for all versions other than 0).

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 7, 2020
Conflicts:
* build.gradle: moved avro plugin definition below
newly added test retry plugin.

* apache-github/trunk:
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007)
  KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568)
  KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
stanislavkozlovski pushed a commit to stanislavkozlovski/kafka that referenced this pull request Feb 18, 2020
…7770)

The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the request-level guarantee of leader availability. There is no mechanism similar to the leader epoch to track the staleness of broker metadata, so we still overwrite all of the broker metadata from each response, which means that the partition metadata can get out of sync with the broker metadata in the client's cache. Hence it is no longer safe to validate inside the `Cluster` constructor that each leader has an associated `Node`

Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the `Node` object at the partition level. In order to keep the state consistent, each `Node` reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single collection and cached partition metadata only references the id of the broker. To accommodate this, we have removed `PartitionInfoAndEpoch` and we have altered `MetadataResponse.PartitionMetadata` to eliminate its `Node` references.

Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in `MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). The only reason this was expensive was because we had to build a new collection for the `Node` representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower level id lists and no copy of the replicas is needed (at least for all versions other than 0).

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk: (410 commits)
  KAFKA-8843: KIP-515: Zookeeper TLS support
  MINOR: Add missing quote for malformed line content (apache#8070)
  MINOR: Simplify KafkaProducerTest (apache#8044)
  KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057)
  KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056)
  KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048)
  HOTFIX: Fix two test failures in JDK11 (apache#8063)
  DOCS - clarify transactionalID and idempotent behavior (apache#7821)
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants