KAFKA-10580: Add topic ID support to Fetch request#9944
KAFKA-10580: Add topic ID support to Fetch request#9944junrao merged 50 commits intoapache:trunkfrom
Conversation
|
Added fetch session components. Will add some versioning tests and final cleanups, then open for review |
…rrect fetch request is sent from consumers and replicas
|
I'm aware that the latest changes to ensure the correct fetch version is sent seem to be causing more test timeouts. Will need to investigate further and hopefully decrease the time needed for fetch requests. I may need to check some other flaky tests related to my changes. |
…ifferentIbpTest and fixed typo in file name
rajinisivaram
left a comment
There was a problem hiding this comment.
@jolshan Thanks for the PR. I left a few comments on the implementation, but will need to go through it again to follow the whole logic. Haven't reviewed tests yet.
|
|
||
| public synchronized Map<Uuid, String> topicNames() { | ||
| return cache.topicNames(); | ||
| } |
There was a problem hiding this comment.
There are several places where we use this combination of two maps, should we create a class that maintains a bidirectional map?
junrao
left a comment
There was a problem hiding this comment.
@jolshan : Thanks for the PR. Made a pass of all non-testing files. Overall, the new logic seems to make sense. It does add additional complexity on the already complicated existing logic. So, it would be useful to spend a bit more time to make the additional logic easier to understand (e.g., naming more properly, adding comments where needed). A few detailed comments below.
Also, it seems that for more partitions, the performance with this PR is noticeably worse than trunk?
FetchRequestBenchmark.testSerializeFetchRequestForReplica 20 500 avgt 15 671785.374 ± 2631.210 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 20 1000 avgt 15 1235326.349 ± 6101.974 ns/op
trunk
FetchRequestBenchmark.testSerializeFetchRequestForReplica 20 500 avgt 15 1407744.600 ± 10321.934 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 20 1000 avgt 15 2901845.093 ± 59557.128 ns/op
| session.partitionMap.mustAdd(cachedPart) | ||
| } | ||
| session.partitionMap.remove(cachedPart) | ||
| iter.remove() |
There was a problem hiding this comment.
This excludes the partition in the response. However, it seems we need to send an error back for this partition?
There was a problem hiding this comment.
If we run into this scenario, does it make sense to always return with an UNKNOWN_TOPIC_ID error? Sometimes partitions will be skipped over anyway when mustRespond is false, so should those also return UNKNOWN_TOPIC_ID?
Just to clarify this, the top is the Fetch branch, so I think it is better than trunk |
|
waiting on #9758 before proceeding since this PR touches a lot of the same files. |
|
Currently working on merge conflicts. Should have a first pass out in the next day or so. There are a few changes that don't work with the previous refactor. @chia7712, can you take a look when I have the commit ready? |
sure :) |
Left a few todos. Still need to clean up, optimize code, and fix flaky and broken tests in FetchRequestBetweenDifferentIbpTest
…etadata cache. Also added code to close session when we get a fetch session ID error
| } | ||
| final FetchRequest.Builder request = FetchRequest.Builder | ||
| .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) | ||
| .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend(), metadata.topicIds()) |
There was a problem hiding this comment.
I noticed that I get topic IDs from metadata here and in the replica fetcher thread, I get from the metadata cache. I don't think it is a big deal since we add to the fetchData using the same source, but it might make sense to use FetchRequestData's topicIds() instead.
| * localLogStartOffset is the log start offset of the partition on this broker. | ||
| */ | ||
| class CachedPartition(val topic: String, | ||
| val topicId: Uuid, |
There was a problem hiding this comment.
Should we include topicId in hashCode() and equals()?
There was a problem hiding this comment.
I thought about this, and originally we compared the topic IDs in the session by grabbing cached partitions and comparing to the IDs in the request. Since we have a new mechanism (the topic ID map) we may no longer need to do this and I can add the ID to the hashcode and equals methods.
There was a problem hiding this comment.
Ah, I found another use -- we lookup partitions toForget using the hashCode. Right now, toForget is a list of topic partitions and we don't directly use the ID provided in the request. We could look up the topic ID from the topic ID map and use it (we could also remove from the session map if we do remove the topic)
| "expected to persist.") | ||
| partitionsWithError += topicPartition | ||
|
|
||
| case Errors.INCONSISTENT_TOPIC_ID => |
There was a problem hiding this comment.
Do we need to handle UNKNOWN_TOPIC_ID here too?
There was a problem hiding this comment.
This is no longer a partition level error. We can only get it as a top level error. If it is a top level error, I believe we return an empty map and do not go down this code path.
| * This method returns a map from topic names to IDs and a map from topic IDs to names | ||
| */ | ||
| def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { | ||
| (topicNamesToIds(), topicIdsToNames()) |
There was a problem hiding this comment.
Since metadataSnapshot could change anytime, it's more consistent if we save a copy of metadataSnapshot and derive both maps from the same cached value.
There was a problem hiding this comment.
One option is to do what the RaftMetadataCache does and simply create a copy of the maps themselves in topicNamesToIds() and topicIdsToNames()
There was a problem hiding this comment.
Could we first save metadataSnapshot to a local val and then derive both maps so that they can be consistent?
| * From version 3 or later, the authorized and existing entries in `FetchRequest.fetchData` should be in the same order in `responseData`. | ||
| * Version 13 introduces topic IDs which mean there may be unresolved partitions. If there is any unknown topic ID in the request, the | ||
| * response will contain a top-level UNKNOWN_TOPIC_ID error and UNKNOWN_TOPIC_ID errors on all the partitions. | ||
| * We may also return UNKNOWN_TOPIC_ID for a given partition when that partition in the session has a topic ID inconsistent with the broker. |
There was a problem hiding this comment.
Should UNKNOWN_TOPIC_ID be FETCH_SESSION_TOPIC_ID_ERROR now?
There was a problem hiding this comment.
I think I just have the wrong things here completely. There should be INCONSISTENT_TOPIC_ID here as well.
There was a problem hiding this comment.
I was just thinking about this and realized we may send new error types to clients that may not be able to handle them. I need to review this code again.
There was a problem hiding this comment.
Ok. Just went through logic for old clients
- UNKNOWN_TOPIC_ID should not be returned since we won't ever use topic IDs (version 12 requests and below)"
- FETCH_SESSION_TOPIC_ID_ERROR should not be returned since we won't send version 13+ in a session and will always have zero uuids
- INCONSISTENT_TOPIC_ID should not be returned, as we won't have topic IDs in the request/session.
The only thing I can think of is downgrading a client while a session is open. I'm not sure if this can happen.
| return new FetchResponse(new FetchResponseData(new ByteBufferAccessor(buffer), version)); | ||
| } | ||
|
|
||
| private LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> toResponseDataMap(Map<Uuid, String> topicIdToNameMap, short version) { |
There was a problem hiding this comment.
We choose to cache responseData here but not in FetchRequest. Is there a particular reason for this inconsistency?
There was a problem hiding this comment.
I think this inconsistency existed before I touched the code. 😅
| case unknownId: UnknownTopicIdException => | ||
| throw unknownId | ||
| case sessionUnknownId: FetchSessionTopicIdException => | ||
| throw sessionUnknownId |
There was a problem hiding this comment.
Got it. Could we clean the existing code up a bit? Since fetchSessionHandler.handleResponse() already handles the closing of the session on error, it seem that we could get rid of fetchSessionHandler.handleError(t). Also, it seems that if fetchResponse.error() != None, we want to throw the error as an exception. Finally, if fetchSessionHandler.handleResponse() returns false, we probably want to throw an exception too?
| next.put(topicPartition, data); | ||
| // topicIds do not change between adding partitions and building, so we can use putIfAbsent | ||
| if (!topicId.equals(Uuid.ZERO_UUID)) { | ||
| topicIds.putIfAbsent(topicPartition.topic(), topicId); |
There was a problem hiding this comment.
Got it. We can keep the code as it is then.
| if (id != Uuid.ZERO_UUID) { | ||
| val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) | ||
| if (prevSessionTopicId != null && prevSessionTopicId != id) | ||
| inconsistentTopicIds.add(topicPart) |
There was a problem hiding this comment.
If we are switching from version 12 to version 13 for a session, prevSessionTopicId will be null. Should we also populate inconsistentTopicIds in this case to force a new session in the client?
There was a problem hiding this comment.
If we switch from 12 to 13, we will not get to this point. We will throw a FETCH_SESSION_ID_ERROR before we get here.
| return sessionTopicNames; | ||
| } | ||
|
|
||
| private boolean canUseTopicIds = false; |
There was a problem hiding this comment.
I don't think we can calculate on a request basis since we may respond with topics that did not have IDs associated.
I added another comment in FetchSession. If the session starts with no topicId and a fetch request switches to using topicId, could the server just return an error to force a new session? Will this avoid the need to track canUseTopicIds as a state? Overall, it's probably a bit better to add a bit complexity on the server to simplify the development on the client since we implement the client multiple times in different languages.
| if (!handler.handleResponse(response)) { | ||
| if (!handler.handleResponse(response, maxVersion)) { | ||
| if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == Errors.UNKNOWN_TOPIC_ID) { | ||
| metadata.requestUpdate(); |
| while (partitionIter.hasNext) { | ||
| partitionIter.next() | ||
| if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) | ||
| error = Errors.INCONSISTENT_TOPIC_ID |
There was a problem hiding this comment.
The INCONSISTENT_TOPIC_ID check in ReplicaManager is not very precise since the topicId could change immediately after the check. I am thinking that another way to do this is to validate the topicId in the session again when we are generating the fetch response. We could pass in the latest topicNameToId mapping from the metadata Cache to updateAndGenerateResponseData(). If the topicId is different from those in the fetch session, we could generate a top level INCONSISTENT_TOPIC_ID error. We could then get rid of the INCONSISTENT_TOPIC_ID check in ReplicaManager.
There was a problem hiding this comment.
@lbradstreet and I discussed this a bit. It seems that the metadata cache may be less accurate than the log itself and that is why we did away with the metadata check. I am also a little unsure (I'd have to check the code) but I'm not sure if the topicId can change. Are we saying that the partition and/or the underlying log can change in this code block? I think we can say we will read from the partition with that ID.
val partition = getPartitionOrException(tp)
val fetchTimeMs = time.milliseconds
// Check if topic ID from the fetch request/session matches the ID in the log
if (!hasConsistentTopicId(topicIdFromSession(partition.topic), partition.topicId))
throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")
.
.
.
val readInfo: LogReadInfo = partition.readRecords(
lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
There was a problem hiding this comment.
Ok, this is fine.
I was thinking that when topicId changes, a pending fetch request could still reference the outdated Partition object and therefore miss the topicId change. This is unlikely and can be tighten up by clearing the segment list when a partition is deleted.
Regarding the metadata propagation, it's true that right now, we propagate the LeaderAndIsrRequest before the UpdateMetadataRequest. With Raft, the topicId will always flow through metadata update first, followed by the ReplicaManager. When we get there, maybe we could simplify the the logic a bit.
There was a problem hiding this comment.
I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition.
Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this.
| INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new), | ||
| TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new); | ||
| TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new), | ||
| FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new); |
There was a problem hiding this comment.
Is there a benefit to have FETCH_SESSION_TOPIC_ID_ERROR in addition to INCONSISTENT_TOPIC_ID? Could we just always use INCONSISTENT_TOPIC_ID?
There was a problem hiding this comment.
I think the main reason why I made the session ID error was that the inconsistent topic ID error's message was too specific for this use case. I suppose we could just make all the errors here session errors. I do like the inconsistent ID error specifying the log (and being on the partition with the issue), but we can change this.
| return sessionTopicNames; | ||
| } | ||
|
|
||
| private boolean canUseTopicIds = false; |
There was a problem hiding this comment.
In the latest PR, it seems that canUseTopicIds is updated on every build() call and can be a local val?
| val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) | ||
| val newCachedPart = new CachedPartition(topicPart, id, reqData) | ||
| if (id != Uuid.ZERO_UUID) { | ||
| val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) |
There was a problem hiding this comment.
It seems that we should never change the topicId in sessionTopicIds? Perhaps we should use putIfAbsent.
Similarly, if the topicId changes, I am not sure if we should update partitionMap below.
There was a problem hiding this comment.
If a topic ID changes, the FetchSession will become a FetchErrorSession and close. I can change to putIfAbsent if it makes things clearer, but all this state will go away upon an error + session close.
| val privileged: Boolean, | ||
| val partitionMap: FetchSession.CACHE_MAP, | ||
| val usesTopicIds: Boolean, | ||
| val sessionTopicIds: FetchSession.TOPIC_ID_MAP, |
There was a problem hiding this comment.
Do we need to include the new fields in toString()?
There was a problem hiding this comment.
I suppose it won't hurt :)
There was a problem hiding this comment.
Taking a second look, seems like we just use partitionMap.size. Not sure if it is useful to have sessionTopicIds size (and if the whole map is too much). I'm thinking maybe just including the usesTopicIds boolean.
| debug(s"Full fetch context with session id $responseSessionId returning " + | ||
| s"${partitionsToLogString(updates.keySet)}") | ||
| FetchResponse.of(Errors.NONE, 0, responseSessionId, updates) | ||
| FetchResponse.of(error, 0, responseSessionId, updates, topicIds) |
There was a problem hiding this comment.
Typically, if there is a topic level error, we set the same error in every partition through FetchRequest.getErrorResponse(). Should we do the same thing here? Ditto for IncrementalFetchContext.updateAndGenerateResponseData().
There was a problem hiding this comment.
I think this goes back to the question of whether it is useful for us to have information on the specific partition that failed. If we do this, should we also return the error values for the other fields as we do in FetchRequest.getErrorResponse?
There was a problem hiding this comment.
I guess the only issue with using FetchRequest.getErrorResponse is that we may have different topics in the response than in the request. SessionErrorContext deals with this by simply having an empty response besides the top level error. I'm wondering if we should do something like this. (Likewise, with the UNKNOWN_TOPIC_ID error, should we also just send back an empty response?)
| while (partitionIter.hasNext) { | ||
| partitionIter.next() | ||
| if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) | ||
| error = Errors.INCONSISTENT_TOPIC_ID |
There was a problem hiding this comment.
Ok, this is fine.
I was thinking that when topicId changes, a pending fetch request could still reference the outdated Partition object and therefore miss the topicId change. This is unlikely and can be tighten up by clearing the segment list when a partition is deleted.
Regarding the metadata propagation, it's true that right now, we propagate the LeaderAndIsrRequest before the UpdateMetadataRequest. With Raft, the topicId will always flow through metadata update first, followed by the ReplicaManager. When we get there, maybe we could simplify the the logic a bit.
| .setTopicId(topic.topicId()) | ||
| .setPartitions(partitionResponses)); | ||
| }); | ||
| // Since UNKNOWN_TOPIC_ID is a new error type only returned when topic ID requests are made (from newer clients), |
There was a problem hiding this comment.
We need to do something like this to easily get the top level error with no partition response for UNKNOWN_TOPIC_ID. I think this works, but we may want a version check as well just to be safe.
| while (partitionIter.hasNext) { | ||
| partitionIter.next() | ||
| val entry = partitionIter.next() | ||
| if (entry.getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { |
There was a problem hiding this comment.
I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition.
Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this.
A fetch request may pass the topicId check in ReplicaManager and is about to call log.read(), when the topicId changes. I was wondering in that case, if log.read() could return data that corresponds to the old topicId. It seems that's not possible since Log.close() closes all segments.
There was a problem hiding this comment.
The topic ID should not change in the log once it is set. I think what you said in the last sentence is correct. My understanding is that if the log is closed, it can not read from it anymore.
| val id: Int = 34 | ||
| } | ||
|
|
||
| case object KAFKA_3_0_IV2 extends DefaultApiVersion { |
There was a problem hiding this comment.
If we can't merge this in 3.0, we will need to change the tag to 3.1.
chia7712
left a comment
There was a problem hiding this comment.
Sorry for making noise. I'm testing trunk branch with our application. The logs start to print error/warn after this PR. Also, the subscription gets slower. I left two questions below. The response from author can help me to dig in it :)
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names. Some of the complicated code is found in FetchSession and FetchSessionHandler. We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names.
Some of the complicated code is found in FetchSession and FetchSessionHandler.
We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget.
We also need to check the topic IDs of the cachedPartitions. If the topic ID changed from when the partition was cached, the strategy here is to remove it from the session since we are operating on a stale partition.
I have added tests to handle some of the edge cases where brokers have different IBP versions and some may have topic IDs while others do not.
Newest benchmarks can be found below.
Committer Checklist (excluded from commit message)