Skip to content

KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs#11331

Merged
dajac merged 37 commits intoapache:trunkfrom
jolshan:KAFKA-13111
Nov 15, 2021
Merged

KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs#11331
dajac merged 37 commits intoapache:trunkfrom
jolshan:KAFKA-13111

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Sep 16, 2021

With the changes for topic IDs, we have a different flow. When a broker receives a request, it uses a map to convert the topic ID to topic names. If the topic ID is not found in the map, we return a top level error and close the session. This decision was motivated by the difficulty to store “unresolved” partitions in the session. In earlier iterations we stored an “unresolved” partition object in the cache, but it was somewhat hard to reason about and required extra logic to try to resolve the topic ID on each incremental request and add to the session. It also required extra logic to forget the topic (either by topic ID if the topic name was never known or by topic name if it was finally resolved when we wanted to remove from the session.)

One helpful simplifying factor is that we only allow one type of request (uses topic ID or does not use topic ID) in the session. That means we can rely on a session continuing to have the same information. We don’t have to worry about converting topics only known by name to topic ID for a response and we won’t need to convert topics only known by ID to name for a response.

This PR introduces a change to store the "unresolved partitions" in the cached partition object. If a version 13+ request is sent with a topic ID that is unknown, a cached partition will be created with that fetch request data and a null topic name. On subsequent incremental requests, unresolved partitions may be resolved with the new IDs found in the metadata cache. When handling the request, getting all partitions will return a TopicIdPartition object that will be used to handle the request and build the response. Since we can rely on only one type of request (with IDs or without), the cached partitions map will have different keys depending on what fetch request version is being used.

This PR involves changes both in FetchSessionHandler and FetchSession. Some major changes are outlined below.

  1. FetchSessionHandler: Forgetting a topic and adding a new topic with the same name - We may have a case where there is a topic foo with ID 1 in the session. Upon a subsequent metadata update, we may have topic foo with ID 2. This means that topic foo has been deleted and recreated. When sending fetch requests version 13+ we will send a request to add foo ID 2 to the session and remove foo ID 1. Otherwise, we will fall back to the same behavior for versions 12 and below

  2. FetchSession: Resolving in Incremental Sessions - Incremental sessions contain two distinct sets of partitions. Partitions that are sent in the latest request that are new/updates/forgotten partitions and the partitions already in the session. If we want to resolve unknown topic IDs we will need to handle both cases.

    • Partitions in the request - These partitions are either new or updating/forgetting previous partitions in the session. The new partitions are trivial. We either have a resolved partition or create a partition that is unresolved. For the other cases, we need to be a bit more careful.
      • For updated partitions we have a few cases – keep in mind, we may not programmatically know if a partition is an update:
        1. partition in session is resolved, update is resolved: trivial

        2. partition in session is unresolved, update is unresolved: in code, this is equivalent to the case above, so trivial as well

        3. partition in session is unresolved, update is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name – to fix this we can check if there exists a cached partition with the given ID and update it both with the partition update and with the topic name.

        4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) – this is the odd case. We will look up the partition using the ID. We will find the old version with a name but will not replace the name. This will lead to an UNKNOWN_TOPIC_OR_PARTITION or INCONSISTENT_TOPIC_ID error which will be handled with a metadata update. Likely a future request will forget the partition, and we will be able to do so by ID.

        5. Two partitions in the session have IDs, but they are different: only one topic ID should exist in the metadata at a time, so likely only one topic ID is in the fetch set. The other one should be in the toForget. We will be able to remove this partition from the session. If for some reason, we don't try to forget this partition — one of the partitions in the session will cause an inconsistent topic ID error and the metadata for this partition will be refreshed — this should result in the old ID being removed from the session. This should not happen if the FetchSessionHandler is correctly in sync.

      • For the forgotten partitions we have the same cases:
        1. partition in session is resolved, forgotten is resolved: trivial

        2. partition in session is unresolved, forgotten is unresolved: in code, this is equivalent to the case above, so trivial as well

        3. partition in session is unresolved, forgotten is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name – to fix this we can check if there exists a cached partition with the given ID and try to forget it before we check the resolved name case.

        4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) We will look up the partition using the ID. We will find the old version with a name and be able to delete it.

        5. both partitions in the session have IDs, but they are different: This should be the same case as described above. If we somehow do not have the ID in the session, no partition will be removed. This should not happen unless the Fetch Session Handler is out of sync.

    • Partitions in the session - there may be some partitions in the session already that are unresolved. We can resolve them in forEachPartition using a method that checks if the partition is unresolved and tries to resolve it using a topicName map from the request. The partition will be resolved before the function using the cached partition is applied.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac dajac self-requested a review September 16, 2021 18:31
Comment thread clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan I have left few high level comments/questions. I think that we are missing few things on the fetcher side as well (e.g. topic id errors must be handled differently).

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetchSession.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 2, 2021

Thanks for the PR. A high-level question, what are we trying to optimize for here?

  1. Requests that don't include topic ids
  2. Requests that include topic ids
  3. Both
  4. Some kind of balance of both where we compromise a bit to keep the code maintainable

@ijuma ijuma mentioned this pull request Oct 2, 2021
3 tasks
@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Oct 4, 2021

@ijuma

Thanks for the PR. A high-level question, what are we trying to optimize for here?
Requests that don't include topic ids
Requests that include topic ids
Both
Some kind of balance of both where we compromise a bit to keep the code maintainable

The goal of this PR is to gracefully handle the new topic case. Currently in kafka, when we create a new topic, the leader and Isr request is sent first, then the update metadata request. This means that we will often encounter transient "unknown_topic_id" errors. In the new world of topic IDs, we will see this as "unknown topic ID" errors. The current logic returns a top level error and delays all partitions. This is a regression from previous behavior, and so this PR's goal is to return to the behavior where we store the unknown partition in the session until it can be resolved. See https://issues.apache.org/jira/browse/KAFKA-13111 for more information.

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan Thanks for the update. I left some comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
Comment thread core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a few more comments.

Comment thread core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Nov 10, 2021

System test results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-11-10--001.system-test-kafka-branch-builder--1636543025--jolshan--KAFKA-13111--165a106bf3/report.html

A previous run was all green, so will need to confirm the 3 failed tests are unrelated to this change.

Comment thread core/src/test/scala/unit/kafka/server/FetchSessionTest.scala Outdated
@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Nov 10, 2021

Looks like the topic id partition changes broke the build. I'll probably need to pull the latest version.

@dajac
Copy link
Copy Markdown
Member

dajac commented Nov 11, 2021

System test results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-11-10--001.system-test-kafka-branch-builder--1636543025--jolshan--KAFKA-13111--165a106bf3/report.html

A previous run was all green, so will need to confirm the 3 failed tests are unrelated to this change.

@jolshan Have you been able to triage these failures?

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @jolshan for your effort on this one. We can merge the PR once the system tests status is clarified.

@dajac dajac merged commit e8818e2 into apache:trunk Nov 15, 2021
dajac pushed a commit that referenced this pull request Nov 15, 2021
With the changes for topic IDs, we have a different flow. When a broker receives a request, it uses a map to convert the topic ID to topic names. If the topic ID is not found in the map, we return a top level error and close the session. This decision was motivated by the difficulty to store “unresolved” partitions in the session. In earlier iterations we stored an “unresolved” partition object in the cache, but it was somewhat hard to reason about and required extra logic to try to resolve the topic ID on each incremental request and add to the session. It also required extra logic to forget the topic (either by topic ID if the topic name was never known or by topic name if it was finally resolved when we wanted to remove from the session.)

One helpful simplifying factor is that we only allow one type of request (uses topic ID or does not use topic ID) in the session. That means we can rely on a session continuing to have the same information. We don’t have to worry about converting topics only known by name to topic ID for a response and we won’t need to convert topics only known by ID to name for a response.

This PR introduces a change to store the "unresolved partitions" in the cached partition object. If a version 13+ request is sent with a topic ID that is unknown, a cached partition will be created with that fetch request data and a null topic name. On subsequent incremental requests, unresolved partitions may be resolved with the new IDs found in the metadata cache. When handling the request, getting all partitions will return a TopicIdPartition object that will be used to handle the request and build the response. Since we can rely on only one type of request (with IDs or without), the cached partitions map will have different keys depending on what fetch request version is being used. 

This PR involves changes both in FetchSessionHandler and FetchSession. Some major changes are outlined below.

1. FetchSessionHandler: Forgetting a topic and adding a new topic with the same name -  We may have a case where there is a topic foo with ID 1 in the session. Upon a subsequent metadata update, we may have topic foo with ID 2. This means that topic foo has been deleted and recreated. When sending fetch requests version 13+ we will send a request to add foo ID 2 to the session and remove foo ID 1. Otherwise, we will fall back to the same behavior for versions 12 and below

2. FetchSession: Resolving in Incremental Sessions - Incremental sessions contain two distinct sets of partitions. Partitions that are sent in the latest request that are new/updates/forgotten partitions and the partitions already in the session. If we want to resolve unknown topic IDs we will need to handle both cases.
    * Partitions in the request  - These partitions are either new or updating/forgetting previous partitions in the session. The new partitions are trivial. We either have a resolved partition or create a partition that is unresolved. For the other cases, we need to be a bit more careful. 
        * For updated partitions we have a few cases – keep in mind, we may not programmatically know if a partition is an update:
            1. partition in session is resolved, update is resolved: trivial

            2. partition in session is unresolved, update is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, update is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and update it both with the partition update and with the topic name.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) – this is the odd case. We will look up the partition using the ID. We will find the old version with a name but will not replace the name. This will lead to an UNKNOWN_TOPIC_OR_PARTITION or INCONSISTENT_TOPIC_ID error which will be handled with a metadata update. Likely a future request will forget the partition, and we will be able to do so by ID.

            5. Two partitions in the session have IDs, but they are different: only one topic ID should exist in the metadata at a time, so likely only one topic ID is in the fetch set. The other one should be in the toForget. We will be able to remove this partition from the session. If for some reason, we don't try to forget this partition — one of the partitions in the session will cause an inconsistent topic ID error and the metadata for this partition will be refreshed — this should result in the old ID being removed from the session. This should not happen if the FetchSessionHandler is correctly in sync.

        * For the forgotten partitions we have the same cases:
            1. partition in session is resolved, forgotten is resolved: trivial

            2. partition in session is unresolved, forgotten is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, forgotten is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and try to forget it before we check the resolved name case.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) We will look up the partition using the ID. We will find the old version with a name and be able to delete it.

            5. both partitions in the session have IDs, but they are different: This should be the same case as described above. If we somehow do not have the ID in the session, no partition will be removed. This should not happen unless the Fetch Session Handler is out of sync.

    * Partitions in the session - there may be some partitions in the session already that are unresolved. We can resolve them in forEachPartition using a method that checks if the partition is unresolved and tries to resolve it using a topicName map from the request. The partition will be resolved before the function using the cached partition is applied.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Copy Markdown
Member

dajac commented Nov 15, 2021

System test failures are not related. Merged to trunk and to 3.1.

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…11331)

With the changes for topic IDs, we have a different flow. When a broker receives a request, it uses a map to convert the topic ID to topic names. If the topic ID is not found in the map, we return a top level error and close the session. This decision was motivated by the difficulty to store “unresolved” partitions in the session. In earlier iterations we stored an “unresolved” partition object in the cache, but it was somewhat hard to reason about and required extra logic to try to resolve the topic ID on each incremental request and add to the session. It also required extra logic to forget the topic (either by topic ID if the topic name was never known or by topic name if it was finally resolved when we wanted to remove from the session.)

One helpful simplifying factor is that we only allow one type of request (uses topic ID or does not use topic ID) in the session. That means we can rely on a session continuing to have the same information. We don’t have to worry about converting topics only known by name to topic ID for a response and we won’t need to convert topics only known by ID to name for a response.

This PR introduces a change to store the "unresolved partitions" in the cached partition object. If a version 13+ request is sent with a topic ID that is unknown, a cached partition will be created with that fetch request data and a null topic name. On subsequent incremental requests, unresolved partitions may be resolved with the new IDs found in the metadata cache. When handling the request, getting all partitions will return a TopicIdPartition object that will be used to handle the request and build the response. Since we can rely on only one type of request (with IDs or without), the cached partitions map will have different keys depending on what fetch request version is being used. 

This PR involves changes both in FetchSessionHandler and FetchSession. Some major changes are outlined below.

1. FetchSessionHandler: Forgetting a topic and adding a new topic with the same name -  We may have a case where there is a topic foo with ID 1 in the session. Upon a subsequent metadata update, we may have topic foo with ID 2. This means that topic foo has been deleted and recreated. When sending fetch requests version 13+ we will send a request to add foo ID 2 to the session and remove foo ID 1. Otherwise, we will fall back to the same behavior for versions 12 and below

2. FetchSession: Resolving in Incremental Sessions - Incremental sessions contain two distinct sets of partitions. Partitions that are sent in the latest request that are new/updates/forgotten partitions and the partitions already in the session. If we want to resolve unknown topic IDs we will need to handle both cases.
    * Partitions in the request  - These partitions are either new or updating/forgetting previous partitions in the session. The new partitions are trivial. We either have a resolved partition or create a partition that is unresolved. For the other cases, we need to be a bit more careful. 
        * For updated partitions we have a few cases – keep in mind, we may not programmatically know if a partition is an update:
            1. partition in session is resolved, update is resolved: trivial

            2. partition in session is unresolved, update is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, update is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and update it both with the partition update and with the topic name.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) – this is the odd case. We will look up the partition using the ID. We will find the old version with a name but will not replace the name. This will lead to an UNKNOWN_TOPIC_OR_PARTITION or INCONSISTENT_TOPIC_ID error which will be handled with a metadata update. Likely a future request will forget the partition, and we will be able to do so by ID.

            5. Two partitions in the session have IDs, but they are different: only one topic ID should exist in the metadata at a time, so likely only one topic ID is in the fetch set. The other one should be in the toForget. We will be able to remove this partition from the session. If for some reason, we don't try to forget this partition — one of the partitions in the session will cause an inconsistent topic ID error and the metadata for this partition will be refreshed — this should result in the old ID being removed from the session. This should not happen if the FetchSessionHandler is correctly in sync.

        * For the forgotten partitions we have the same cases:
            1. partition in session is resolved, forgotten is resolved: trivial

            2. partition in session is unresolved, forgotten is unresolved: in code, this is equivalent to the case above, so trivial as well

            3. partition in session is unresolved, forgotten is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name –  to fix this we can check if there exists a cached partition with the given ID and try to forget it before we check the resolved name case.

            4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) We will look up the partition using the ID. We will find the old version with a name and be able to delete it.

            5. both partitions in the session have IDs, but they are different: This should be the same case as described above. If we somehow do not have the ID in the session, no partition will be removed. This should not happen unless the Fetch Session Handler is out of sync.

    * Partitions in the session - there may be some partitions in the session already that are unresolved. We can resolve them in forEachPartition using a method that checks if the partition is unresolved and tries to resolve it using a topicName map from the request. The partition will be resolved before the function using the cached partition is applied.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants