KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.#13535
Conversation
ca16e79 to
9ba10cf
Compare
showuon
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments.
9acff1f to
fb613c8
Compare
| RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); | ||
|
|
||
| if (firstBatch == null) | ||
| return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, |
There was a problem hiding this comment.
I think we need to log something in this case.
fb613c8 to
c2873f5
Compare
| // The 1st topic-partition that has to be read from remote storage | ||
| var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() |
There was a problem hiding this comment.
I understand a new PR will come to overcome this, but could we provide further context (on the source code or PR) about the implications of using the first topic-partition only?
There was a problem hiding this comment.
Agreed - there are consumption patterns which diverge from the local case with this approach (that is, uneven progress across the partitions consumed from a topic [with said partitions of the same nature w.r.t. record batch size and overall size]).
It may be preferable not to diverge from the local approach and read from all the remote partitions found in the fetchInfos. Then, a different read pattern which provides greater performance for a specific operational environment and workload could be enforced via a configuration property.
There was a problem hiding this comment.
As I already called out in this PR description, that it is followed up with a PR. We will describe the config on different options with respective scenarios. The default value will be to fetch from multiple partitions as it does with local log segments.
| // may arrive and hence make this operation completable. | ||
| delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) | ||
|
|
||
| if (remoteFetchInfo.isPresent) { |
There was a problem hiding this comment.
In line 1082, we should further test !remoteFetchInfo.isPresent, right?
There was a problem hiding this comment.
I am not sure line num:1082 is sane as you meant it to be as the file could have been updated. Please clarify.
There was a problem hiding this comment.
In the following code, we should go into that branch only if remoteFetchInfo is empty, right? Otherwise, if we could get into a situation that a remote partition is never served because the fetch request is always satisfied with new local data on other partitions.
if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica) {
There was a problem hiding this comment.
Do you mean to say that we should not return immediately if remoteFetchInfo exists because that should be served otherwise remote fetches may starve as long as there is enough data immediately available to be sent? So, the condition becomes
if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty
|| bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch
|| hasPreferredReadReplica))
There was a problem hiding this comment.
Sure, that check was missed while pulling the changes. Good catch. Updated it with the latest commit.
| InputStream remoteSegInputStream = null; | ||
| try { | ||
| // Search forward for the position of the last offset that is greater than or equal to the target offset | ||
| remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); |
There was a problem hiding this comment.
Would be possible sending the endOffset as well? Without it, input stream will potentially contain the whole log and not be consumed til the end.
In the case of S3, when inputstream is not consumed til the end HTTP connection is aborted.
There was a problem hiding this comment.
We will look into it in a followup PR.
| // The 1st topic-partition that has to be read from remote storage | ||
| var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() |
There was a problem hiding this comment.
Agreed - there are consumption patterns which diverge from the local case with this approach (that is, uneven progress across the partitions consumed from a topic [with said partitions of the same nature w.r.t. record batch size and overall size]).
It may be preferable not to diverge from the local approach and read from all the remote partitions found in the fetchInfos. Then, a different read pattern which provides greater performance for a specific operational environment and workload could be enforced via a configuration property.
b9c6ef8 to
666fd8d
Compare
| } | ||
|
|
||
| if (searchInLocalLog) { | ||
| txnIndexOpt = (localLogSegments.hasNext()) ? Optional.of(localLogSegments.next().txnIndex()) : Optional.empty(); |
There was a problem hiding this comment.
Right, it can have duplicates. But consumer already handles the duplicate aborted transactions. Updated the code to remove duplicates incase any consumer implementation can not handle duplicate aborted transactions.
| // may arrive and hence make this operation completable. | ||
| delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) | ||
|
|
||
| if (remoteFetchInfo.isPresent) { |
There was a problem hiding this comment.
I am not sure line num:1082 is sane as you meant it to be as the file could have been updated. Please clarify.
4a8f67f to
b8a3c83
Compare
|
@Hangleton @junrao @jeqo , any other comments to this PR? We hope we can merge it in the early stage of a release, so that we can have enough time to test the stability and have more improvement. Thanks. |
divijvaidya
left a comment
There was a problem hiding this comment.
Overall looks good to me. One major comment about correctly shutting down the delayed fetch thread pool, otherwise looks good to me.
There was a problem hiding this comment.
Is this RejectedExecutionException propagated to the Consumer fetch? If yes, is this a change in the existing interface with the consumer? (please correct me if I am wrong but I am not aware of consumer handling or expecting RejectedExecutionException today.
There was a problem hiding this comment.
This error is propagated as unexpected error (UnknownServerException) to the consumer client and it is already handled.
There was a problem hiding this comment.
Thank you. That answers my question.
There was a problem hiding this comment.
Should we add a log if the nature of the error is not propagated?
| // The 1st topic-partition that has to be read from remote storage | ||
| var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() |
There was a problem hiding this comment.
nit: instead of NOT_AVAILABLE, maybe the message could report that the log start offset is strictly greater than the fetch offset?
divijvaidya
left a comment
There was a problem hiding this comment.
Thank you for addressing the previous comments Satish. I have some additional ones about how we are handling shutdown.
There was a problem hiding this comment.
How did we decide on 2min. here? I don't think we should block shutdown of the broker on this over here because there are other limits associated with clean vs unclean shutdown. If we do plan to block, we should tie it to overall shutdown timeout. As an example, clean shutdown is expected to be completed in 5 min. see lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES) in BrokerServer.scala.
There was a problem hiding this comment.
It does not require that to be completed in 5 mins. lifecycleManager.controlledShutdownFuture is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here.
Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem.
divijvaidya
left a comment
There was a problem hiding this comment.
This change looks good to me! (assuming tests will be merged in separate PR)
| // may arrive and hence make this operation completable. | ||
| delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) | ||
|
|
||
| if (remoteFetchInfo.isPresent) { |
There was a problem hiding this comment.
In the following code, we should go into that branch only if remoteFetchInfo is empty, right? Otherwise, if we could get into a situation that a remote partition is never served because the fetch request is always satisfied with new local data on other partitions.
if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica) {
| } | ||
|
|
||
| if (searchInLocalLog) { | ||
| txnIndexOpt = (localLogSegments.hasNext()) ? Optional.of(localLogSegments.next().txnIndex()) : Optional.empty(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
@junrao We are not sure whether those failures are related to this change. They do not fail on the laptop or other hosts. We are looking into those failures. |
…ate the test failure in Jenkins.
It is not because of the changes in the PR. #10389 attempted to stabilize this test but it can still fail if the machine is slow. |
junrao
left a comment
There was a problem hiding this comment.
@kamalcph : Thanks for the investigation. The PR LGTM. Just a couple of minor comments.
Also, should we reopen https://issues.apache.org/jira/browse/KAFKA-12384 since it's still flaky?
|
Thanks @junrao for the updated review. Addressed your latest minor review comments. |
|
Thanks @junrao for the latest comments, addressed them with the latest commit. |
|
@satishd |
|
@dajac It is passed locally on my laptop. |
|
@satishd Weird... It fails all the time on my laptop. |
| quota: ReplicaQuota, | ||
| responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit | ||
| ): Unit = { | ||
| def fetchMessages(params: FetchParams, |
There was a problem hiding this comment.
A small comment. We should avoid completely changing the code style without reasons. The format of the method was not a mistake. It is the format that we mainly used in this class nowadays.
| private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params: FetchParams, fetchInfo: PartitionData, | ||
| adjustedMaxBytes: Int, minOneMessage: | ||
| Boolean, log: UnifiedLog, fetchTimeMs: Long, | ||
| exception: OffsetOutOfRangeException): LogReadResult = { |
There was a problem hiding this comment.
We usually don't format method like this. Could we put one argument per line?
| val fetchDataInfo = | ||
| new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), |
There was a problem hiding this comment.
nit: new FetchDataInfo should be on previous line or indented.
This PR includes
We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR.
A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs.
Other contributors:
kamal.chandraprakash@gmail.com - Further improvements and adding a few tests
showuon@gmail.com - Added a few test cases for these changes.
PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those.
Committer Checklist (excluded from commit message)