Skip to content

[KAFKA-13369] Follower fetch protocol changes for tiered storage.#11390

Merged
junrao merged 46 commits intoapache:trunkfrom
satishd:tier-follower-fetch
Dec 17, 2022
Merged

[KAFKA-13369] Follower fetch protocol changes for tiered storage.#11390
junrao merged 46 commits intoapache:trunkfrom
satishd:tier-follower-fetch

Conversation

@satishd
Copy link
Copy Markdown
Member

@satishd satishd commented Oct 12, 2021

This PR implements the follower fetch protocol as mentioned in KIP-405.

Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.

Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.

We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.

When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.

Introduced RemoteLogManager which is responsible for

  • initializing RemoteStorageManager and RemoteLogMetadataManager instances.
  • receives any leader and follower replica events and partition stop events and act on them
  • also provides APIs to fetch indexes, metadata about remote log segments.

Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.

You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication

Authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@satishd satishd force-pushed the tier-follower-fetch branch 4 times, most recently from bc2983c to 270988f Compare October 14, 2021 08:41
@satishd satishd force-pushed the tier-follower-fetch branch from 270988f to de8ef09 Compare October 19, 2021 11:58
@kowshik
Copy link
Copy Markdown
Contributor

kowshik commented Nov 1, 2021

@satishd It'd be helpful if you could please update the PR description explaining the scope of the draft PR (in its current form) and what's remaining to be done.

@satishd satishd force-pushed the tier-follower-fetch branch 5 times, most recently from f0e0b61 to 9d6e2a9 Compare November 17, 2021 09:53
@satishd satishd marked this pull request as ready for review November 17, 2021 10:33
@satishd
Copy link
Copy Markdown
Member Author

satishd commented Nov 17, 2021

@junrao @kowshik This PR is ready for review. Please take a look.

Comment thread core/src/main/scala/kafka/log/CleanableIndex.scala Outdated
Comment thread core/src/main/scala/kafka/server/BrokerServer.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
@satishd satishd force-pushed the tier-follower-fetch branch from 9d6e2a9 to 1436fb3 Compare November 18, 2021 04:14
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishd : Thanks for the PR. Made a pass of all non-testing files. A few comments below.

Comment thread .gitignore
Comment thread clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java Outdated
Comment thread clients/src/main/resources/common/message/ListOffsetsRequest.json Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/log/ProducerStateManager.scala
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/api/ApiVersion.scala Outdated
Comment thread clients/src/main/resources/common/message/ListOffsetsRequest.json Outdated
@satishd satishd force-pushed the tier-follower-fetch branch 3 times, most recently from 611244c to eabd343 Compare December 13, 2021 12:23
@satishd
Copy link
Copy Markdown
Member Author

satishd commented Dec 13, 2021

Thanks @junrao for the review. Please find inline replies, addressed most of them with latest commits.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishd : Thanks for the updated PR. A few more comments below.

Comment thread clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java Outdated

case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
// No need to retry this as it indicates that the requested offset is moved to tiered storage.
// Check whether topicId is available here.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this comment addressed?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value is not very intuitive. I'd expect a true return value to indicate that the request is handled successfully.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inline with handleOutOfRangeError contract. I am fine with the suggested change but it is good to have similar semantics to handleOutOfRangeError method too for uniformity.

Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Outdated
@satishd satishd force-pushed the tier-follower-fetch branch 3 times, most recently from baa5b72 to 0c0b477 Compare December 21, 2021 13:42
@satishd
Copy link
Copy Markdown
Member Author

satishd commented Dec 21, 2021

@junrao : Thanks for the review. Please find inline replies, updated the PR addressing them with the latest commit.

Comment thread core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java Outdated
kamalcph and others added 21 commits December 16, 2022 15:27
Summary: Added more UTs for RemoteLogManager

Test Plan: UT

Reviewers: abhijeek, satishd

Reviewed By: satishd

Revert Plan: git revert

API Changes: NA

Monitoring and Alerts: NA
Summary:
- The latest metadata version IBP_3_3_IV4 was not referred on the existing tests which lead to the failures.
- After the local and remote leader endpoint changes, the fetcher thread tests related to remote storage become stale, fixed them.
- Fixed the issue while renaming the parent directory of the index files.

Test Plan:
./gradlew :core:test
./gradlew :metadata:test

Reviewers: abhijeek, satishd

Revert Plan: git revert

API Changes: NA

Monitoring and Alerts: NA
- Added loading of the existing indexes in the disk and added respective test.
- Addressed other minor comments.
Resolved a few test cases related to new versions.
…or getIndex if it is already closed.

Addressed review comments
Updated highwatermark when producerstate is rebuilt in fetcher.
Throwing RemoteStorageException from follwer replica when remtoe stoage is not yet enabled.
@satishd
Copy link
Copy Markdown
Member Author

satishd commented Dec 16, 2022

Thanks @junrao for your updated review. Addressed them with inline comments and updated with the latest commits.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishd : Thanks for the updated PR. Just one more comment. Also, there are quite a few test failures. Are they related to this PR?

def loadProducerState(lastOffset: Long): Unit = lock synchronized {
rebuildProducerState(lastOffset, producerStateManager)
maybeIncrementFirstUnstableOffset()
updateHighWatermark(localLog.logEndOffset)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logEndOffset still uses offset. We want to use logEndOffsetMetadata.

@satishd
Copy link
Copy Markdown
Member Author

satishd commented Dec 17, 2022

Thanks @junrao for the review, addressed it with the latest commit.

There are a few tests that are failed but they do not seem to be related to this PR.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishd : Thanks for the latest PR. LGTM

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Dec 17, 2022

Hey @satishd, I wanted to let you know about KAFKA-14470 as I think it affects some of the future KIP-405 PRs. Can we align these efforts so that we can get to the desired end state faster? For example, once the PRs that have been submitted are merged, we can move RemoteIndexCache to the storage module too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tiered-storage Related to the Tiered Storage feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants