KAFKA-15859: Make RemoteListOffsets call an async operation#16602
KAFKA-15859: Make RemoteListOffsets call an async operation#16602chia7712 merged 7 commits intoapache:trunkfrom
Conversation
|
Test failures are unrelated. |
516edad to
3e627ac
Compare
a448c3e to
9464b90
Compare
yes, this PR can be reviewed. There are no public API changes made in this PR. To define the timeout for delayed remote list offsets operation, reused the server request timeout since the tiered storage is not production ready. If it is not acceptable, then we may have to wait for the KIP-1075 approval. |
|
Well, I'll review KIP-1075 first then. |
showuon
left a comment
There was a problem hiding this comment.
Had a quick pass, I understand more about what we're trying to achieve here. High-level comment: I saw there are some new added files are in scala, could we re-write into java? You don't have to do that now, please wait until KIP accepted. Let's discuss more detail in KIP discussion. Thanks.
6c76808 to
5f742db
Compare
|
@kamalcph please fix conflicts, thanks :) |
2608e65 to
e36054c
Compare
| delayedRemoteListOffsetsPurgatory.checkAndComplete(key); | ||
| }) | ||
| ); | ||
| return new AsyncOffsetReadFutureHolder<>(jobFuture, taskFuture); |
There was a problem hiding this comment.
Pardon me, why we need two futures here? Is CompletableFuture.supplyAsync unsuitable to this case? for example:
CompletableFuture<Optional<FileRecords.TimestampAndOffset>> taskFuture = CompletableFuture.supplyAsync(() -> {
try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
Optional<FileRecords.TimestampAndOffset> rval = findOffsetByTimestamp(topicPartition, timestamp, startingOffset, leaderEpochCache);
if (rval.isPresent()) return rval;
return OptionConverters.toJava(searchLocalLog.get());
} catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", topicPartition, e);
throw new RuntimeException(e);
}
}, remoteStorageReaderThreadPool);There was a problem hiding this comment.
Thanks for the review!
The reason for maintaining 2 futures: jobFuture and taskFuture. They are required to trigger the delayed operation completion (delayedRemoteListOffsetsPurgatory#checkAndComplete(key)) in the same remote-log-reader thread after the RemoteLogManager#findOffsetByTimestamp ++ searchLocalLog operation completes.
In DelayedRemoteListOffsets purgatory, we return the result when all the partitions results are received. Then, the delayedOperation gets completed.
We have ActionQueue to complete the pending actions but the LIST_OFFSETS request can be served by any replica (least loaded node). If the node serving the request doesn't have leadership for any of the partitions, then the result might not be complete.
There was a problem hiding this comment.
Thanks for the explanation, but "trigger the delayed operation completion in the same thread" seems to work by CompletableFuture.supplyAsync, right?
CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = CompletableFuture.supplyAsync(() -> {
Either<Exception, Option<FileRecords.TimestampAndOffset>> result;
try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
OptionConverters.toScala(findOffsetByTimestamp(topicPartition, timestamp, startingOffset, leaderEpochCache))
.orElse(searchLocalLog::get);
result = Right.apply(timestampAndOffsetOpt);
} catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", topicPartition, e);
result = Left.apply(e);
} finally {
TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());
delayedRemoteListOffsetsPurgatory.checkAndComplete(key);
}
return result;
}, remoteStorageReaderThreadPool);I notice there is another similar pattern DelayedRemoteFetch, so it is OK to keep current design for consistency. However, it would be great to let me known (for my own education) what the side-effect happens if using CompletableFuture.supplyAsync :)
There was a problem hiding this comment.
This won't work. When delayedRemoteListOffsetsPurgatory.checkAndComplete(key) is invoked, it calls DelayedRemoteListOffset#tryComplete which checks whether the taskFuture gets completed or not. It is not completed so the delayedOperation won't complete.
There was a problem hiding this comment.
It is not completed so the delayedOperation won't complete.
oh, you are totally right, thanks!!!
| fetchOnlyFromLeader) | ||
|
|
||
| val status = resultHolder match { | ||
| case OffsetResultHolder(Some(found), _) => |
There was a problem hiding this comment.
The data structure gets complicated in this path. If those new structures serve for "remote" only, could you please consider defining a subclass of TimestampAndOffset to have data used by remote only?
or please add comments for those cases at least?
There was a problem hiding this comment.
This PR is large. I'll address the refactoring in the next PR. Is it fine?
please add comments for those cases at least?
Added comments. Let me know if it needs to be improved.
| .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) | ||
| .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, | ||
| Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) | ||
| OffsetResultHolder(timestampAndOffsetOpt) |
There was a problem hiding this comment.
Pardon me, why MAX_TIMESTAMP does not consider the records in the remote storage?
There was a problem hiding this comment.
Went over KIP-734, the purpose of MAX_TIMESTAMP is to get the offset of the record with highest timestamp in the partition:
Used to retrieve the offset with the largest timestamp of a partition as message timestamps can be specified client side this may not match the log end offset returned by LatestSpec
With remote storage enabled, all the passive segments might be uploaded to remote and removed from local-log. The local-log might contain only one empty active segment. We have to handle the MAX_TIMESTAMP case for remote storage. Thanks for catching this! Filed KAFKA-17552 to track this issue separately.
There was a problem hiding this comment.
I went over #15621 to see how do we handle the MAX_TIMESTAMP case for normal topics.
Now, we maintain the shallowOffsetOfMaxTimestampSoFar in LogSegment instead of the real-max-timestamp-offset. While uploading the LogSegment to remote, we create the RemoteLogSegmentMetadata event which holds the metadata information about the segment.
Even, if we pass the shallowOffsetOfMaxTimestampSoFar in the RemoteLogSegmentMetadata event, we have to download the remote-log-segment to find the real offsetOfMaxTimestampSoFar. This will increase the load on remote storage assuming that the original intention of the KIP-734 is to Confirming topic/partition "liveness" which means the Admin client will repeatedly invoke the list-offsets API for MAX_TIMESTAMP.
The predominant case to find the "Confirming topic/partition livness" is to query the local-log which will work as expected. For MAX_TIMESTAMP, when enabled with remote storage, the results can go wrong when:
- Assuming the timestamps are monotonic and all the passive segments are uploaded to remote and deleted from local. The only active segment in local disk is empty (post the log roll), then the max-timestamp will returned as "-1".
- Assuming the timestamps are non-monotonic, then the (timestamp, offset) returned by the API may not be "TRUE" (max-timestamp, max-timestamp-offset) as it considers the segments only in the local-log.
Should we handle/drop the MAX_TIMESTAMP case for topics enabled with remote storage? This can cause high load:
- On the RemoteLogMetadataManager, as we have to scan all the uploaded segment events to find the max-timestamp. Then, compare it with the max-timestamp computed from the local-log segments. And, the search should always proceed from remote to local storage.
- On the RemoteStorageManager, when there exists the MAX_TIMESTAMP record in the remote storage, then we have to download that segment (few bytes) repeatedly to serve the query.
In KIP-734, can we make a addendum to say that MAX_TIMESTAMP is not supported for topics enabled with remote storage? Note that when the KIP was proposed, the intention was not to read from the disk:
Snippet from KIP-734:
LogSegments track the highest timestamp and associated offset so we don't have to go to disk to fetch this
There was a problem hiding this comment.
Even, if we pass the shallowOffsetOfMaxTimestampSoFar in the RemoteLogSegmentMetadata event, we have to download the remote-log-segment to find the real offsetOfMaxTimestampSoFar.
IMHO, we don't need to pass shallowOffsetOfMaxTimestampSoFar to RemoteLogSegmentMetadata as RemoteLogSegmentMetadata has maxTimestampMs already, so the basic behavior is listed below (same as you described I'd say)
- find the max timestamp from local segments
- query
findOffsetByTimestampby max timestamp from step_1 - compare the timestamp of record from remote to local to pick up correct offset
This will increase the load on remote storage assuming that the original intention of the KIP-734 is to Confirming topic/partition "liveness" which means the Admin client will repeatedly invoke the list-offsets API for MAX_TIMESTAMP.
The impl of KIP-734 was wrong because we don't loop all records in all path (because of cost issue). Hence, we rename the offsetOfMaxTimestampSoFar to shallowOffsetOfMaxTimestampSoFar based on true story.
Should we handle/drop the MAX_TIMESTAMP case for topics enabled with remote storage? This can cause high load:
that is a acceptable approach. We can REJECT the MAX_TIMESTAMP request for now as it is rare operation. Or we can make the call an async op too as it needs to iterate all metadata of remote segments.
| override def tryComplete(): Boolean = { | ||
| var completable = true | ||
| metadata.statusByPartition.forKeyValue { (partition, status) => | ||
| if (!status.completed) { |
There was a problem hiding this comment.
Could we check the status of futures instead? for example:
def completable = status.futureHolderOpt.isEmpty || status.futureHolderOpt.get.jobFuture.isDone
There was a problem hiding this comment.
As mentioned in the other comment, we need the status.completed variable as it is volatile and accessed by multiple threads for inter-thread visibility.
There was a problem hiding this comment.
def completable = status.futureHolderOpt.isEmpty || status.futureHolderOpt.get.jobFuture.isDone
This method seems to be thread-safe. We can use this method instead of completed variable. But, I find using the variable makes the code clear. I'm open to change this. Will take this refactoring in the next PR.
| metadata.statusByPartition.forKeyValue { (topicPartition, status) => | ||
| status.completed = status.futureHolderOpt.isEmpty | ||
| if (status.futureHolderOpt.isDefined) { | ||
| status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) |
There was a problem hiding this comment.
Pardon me, why setting responseOpt early? If we keep the responseOpt be None, we can reuse responseOpt to evaluate "completed"
There was a problem hiding this comment.
yes, that's true. Assume that one LIST_OFFSETS request wants to query the offsetForTimestamp for 10 partitions, those 10 partitions are handled in concurrent fashion, provided that the remote-log-reader threads are available. If any one thread completes, then it marks that partition status as completed and checks for the statuses of all the other partitions.
The variable completed is accessed by multiple remote-log-reader threads so marked it as volatile and used it for computation instead of responseOpt.
|
JDK11 test failures are unrelated to this PR, the tests were timed out. |
To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions. The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as PRODUCE and FETCH won't be handled and in worst-case can be dropped. In this patch, we have introduced a delayed operation for remote list-offsets call. If the offsets need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion.
- Started the key-value pair from 0 to match with the offset number. (k0, v0) matches with the offset-0, this improves the test readability.
143cde3 to
578fe23
Compare
|
@kamalcph thanks for checking the failed tests. They pass on my local. will merge this PR |
|
@kamalcph any updates? or just trigger QA again? |
|
I rebased the branch against trunk to retrigger the tests again. |
Got it |
|
Thank you all for the reviews! |
|
@kamalcph, the test introduced in this PR is flaky on trunk https://ge.apache.org/scans/tests?search.names=CI%20workflow&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=America%2FNew_York&search.values=CI&tests.container=kafka.log.remote.RemoteLogOffsetReaderTest&tests.sortField=FLAKY Can you take a look? I have filed KAFKA-17559 in the mean time. Thanks! |
|
@mumrah Sorry for that flaky. I will take a look! |
|
Opened #17214 to fix the flaky test. PTAL. |
| */ | ||
| @nowarn("cat=deprecation") | ||
| def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = { | ||
| def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = { |
There was a problem hiding this comment.
Could we change the description of the return value accordingly?
| } | ||
|
|
||
| public void setDelayedOperationPurgatory(DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory) { | ||
| this.delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatory; |
There was a problem hiding this comment.
delayedRemoteListOffsetsPurgatory is written and read by different threads. Does it need to be volatile?
There was a problem hiding this comment.
Other purgatories are also accessed by multiple threads but they don't have the volatile. So, followed the same approach. Let me know whether it is required.
There was a problem hiding this comment.
You are talking about the purgatories in ReplicaManager? They are set during the creation of ReplicaManager. Here, delayedRemoteListOffsetsPurgatory is not set during the creation of RemoteLogManager.
There was a problem hiding this comment.
Added the volatile to the delayedRemoteListOffsetsPurgatory in RemoteLogManager. My understanding was that we instantiate the dataPlaneRequestProcessor after calling the ReplicaManager#startup, so there won't be an issue. It is good to be on the safer side.
| } | ||
| } | ||
|
|
||
| class DelayedRemoteListOffsets(delayMs: Long, |
There was a problem hiding this comment.
ReplicaManager calls completeDelayedFetchOrProduceRequests when a replica is removed from the broker or becomes a follower to wake up pending produce/fetch request early. Should we do the same for pending remoteListOffset requests?
There was a problem hiding this comment.
Addressed this in #17487.
Retained the same method name. Shall we change the method name to completeDelayedFetchOrProduceOrRemoteListOffsetsRequests?
There was a problem hiding this comment.
Perhaps naming it completeDelayedOperationsWhenNotPartitionLeader ?
There was a problem hiding this comment.
Renamed the method.
| import scala.collection.{Map, mutable} | ||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartitionResponse] = None, |
There was a problem hiding this comment.
responseOpt can be written and read by different threads. Should it be volatile?
| // create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation | ||
| val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq | ||
| // try to complete the request immediately, otherwise put it into the purgatory | ||
| delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys) |
There was a problem hiding this comment.
listOffsetsRequestKeys is a bit weird. It is based on a topicPartition and the purgatory is checked on that key every time a remote listOffset task completes. However, the completion of such a task has no impact on other pending listOffset requests on the same partition.
The only reason we need the purgatory is really just for the expiration logic after the timeout if we chain all the futures together. Perhaps, using the pattern of DelayedFuturePurgatory is more intuitive?
There was a problem hiding this comment.
Went through the DelayedFuturePurgatory and understood the changes required. This is a big change, so we can take this separately:
- DelayedFuturePurgatory does not emit any request expiration metrics.
- When a replica for a partition moves away, then we cannot complete the request for that partition as the
watchKeyis unique for each request.
There was a problem hiding this comment.
- Where is the request expiration metric emitted now?
- Good point. We could have a customized DelayedFuturePurgatory that also adds a delayed operation key per partition. But they are triggered for completion check when the replica is no longer the leader.
There was a problem hiding this comment.
- The expiration metrics are being emitted by the individual purgatory. (eg) DelayedRemoteListOffsetsMetrics#recordExpiration
- Agree, this will improve the performance. Will take the custom DelayedFuturePurgatory changes separately as it is a big change.
| } | ||
| } | ||
|
|
||
| case class ListOffsetsMetadata(statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus]) { |
There was a problem hiding this comment.
Hmm, why do we need this wrapper class? Could we just use the Map directly?
|
Thanks @junrao for the review comments! I will follow-up on them. |
…ca is removed from broker. - Removed the ListOffsetsMetadata wrapper class. - Addressed review comments from PR apache#16602
…ca is removed from broker. - Removed the ListOffsetsMetadata wrapper class. - Addressed review comments from PR apache#16602
…ca is removed from broker. - Removed the ListOffsetsMetadata wrapper class. - Addressed review comments from PR apache#16602
…ca is removed from broker. - Removed the ListOffsetsMetadata wrapper class. - Addressed review comments from PR apache#16602
) This is the part-2 of the KIP-1075 To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions. The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. In this patch, we have introduced a delayed operation for remote list-offsets call. If the timestamp need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion. Covered the patch with unit and integration tests. Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This is the part-2 of the KIP-1075
To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.
The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages.
In this patch, we have introduced a delayed operation for remote list-offsets call. If the timestamp need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion.
Covered the patch with unit and integration tests.
Committer Checklist (excluded from commit message)