KAFKA-17331: Throw unsupported version exception if the server does NOT support EarliestLocalSpec and LatestTieredSpec#16873
Conversation
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the PR. Left a comment.
| } else if (partition.timestamp() == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && version < 8) { | ||
| throw new UnsupportedVersionException(s"apiVersion must be >=8 for EARLIEST_LOCAL_TIMESTAMP") | ||
| } else if (partition.timestamp() == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP && version < 9) { | ||
| throw new UnsupportedVersionException(s"apiVersion must be >=9 for LATEST_TIERED_TIMESTAMP") |
There was a problem hiding this comment.
This covers the case when a client sets a timestamp constant not supported in a version. It only happens if the client is incorrectly implemented. It could happen, but is less common.
A more common case is that a correctly implemented client sends a new version of the request to an old server that doesn't understand the new version. To cover this case, we probably could change the following method in UnifiedLog.
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
...
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
...
} else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
}
} else {
...
}
}
Currently, if the code falls into the last else, it assumes that the timestamp is positive. However, this won't be the case if the request is on a newer version including a new negative constant. To be more defensive, we could throw an UnsupportedVersionException if the timestamp is negative in the else clause.
There was a problem hiding this comment.
Actually, my suggestion is not needed since it's covered in RequestContext.parseRequest.
My main question is #16841 (comment). It's not clear to me why there is no test failure when we set a stable MV depending on an unstable ListOffset. It would be useful to understand the testing gap and see if we could improve the test coverage.
There was a problem hiding this comment.
A more common case is that a correctly implemented client sends a new version of the request to an old server that doesn't understand the new version. To cover this case, we probably could change the following method in UnifiedLog.
It seems to me the "new client" should throw exception in building request [0]. The other requests have similar protection [1][2][3]. This part will be implemented by @frankvicky
My main question is #16841 (comment). It's not clear to me why there is no test failure when we set a stable MV depending on an unstable ListOffset. It would be useful to understand the testing gap and see if we could improve the test coverage.
the MV-related test will be addressed by https://issues.apache.org/jira/browse/KAFKA-17336 rather than this PR. This PR is used to return "correct" response (unsupported version) to client which send old version with "new" timestamp. I have left a summary on #16841 (comment). In short, we try to make sure all paths (broker-2-broker, old-client-2-new-broker, new-client-2-old-broker) works well and get correct error.
[0]
[1]
[2]
[3]
There was a problem hiding this comment.
The other requests have similar protection [1][2][3]. This part will be implemented by @frankvicky
chia7712
left a comment
There was a problem hiding this comment.
@FrankYang0529 thanks for this patch
| s"failed because the partition is duplicated in the request.") | ||
| buildErrorResponse(Errors.INVALID_REQUEST, partition) | ||
| } else if (partition.timestamp() == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && version < 8) { | ||
| throw new UnsupportedVersionException(s"apiVersion must be >=8 for EARLIEST_LOCAL_TIMESTAMP") |
There was a problem hiding this comment.
Could you please build error response to the partition which has unsupported content? throwing exception will break whole request
There was a problem hiding this comment.
Thank you. Update it.
5776711 to
81389cf
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Just one more comment.
|
|
||
| val listOffsetRequest = ListOffsetsRequest.Builder | ||
| .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) | ||
| .setTargetTimes(targetTimes).build(version) |
There was a problem hiding this comment.
With #16876, we can't build an incorrect request like that, right?
There was a problem hiding this comment.
Thanks for catching this. I should be more careful about it. Updated it.
81389cf to
e7b5a3b
Compare
| debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + | ||
| s"failed because the partition is duplicated in the request.") | ||
| buildErrorResponse(Errors.INVALID_REQUEST, partition) | ||
| } else if (timestampMinSupportedVersion.contains(partition.timestamp()) && version < timestampMinSupportedVersion(partition.timestamp())) { |
There was a problem hiding this comment.
Personally, I love this strict check to avoid "undefined" behavior. However, I also try to raise "possible" objection.
Should we view "negative timestamp" as user-defined data?
I had saw users who using the negative timestamp (yes, we don't disable that in protocol) in their data. That means the "negative timestamp" could be either user-defined data or flag. Hence, those new flag (and version check) can break the behavior that tries to find the offset of user-defined negative timestamp. We can fix it by adding a new field timestampType to LIST_OFFSET request. The new field can be used to indicate the "type" of timestamp.
There was a problem hiding this comment.
for another one:
Should we keep the behavior of timestamp and unmatched version?
the older broker which does not know either EARLIEST_LOCAL_TIMESTAMP or LATEST_TIERED_TIMESTAMP will process the timestamp "flag" as normal timestamp (do the search). That means the new broker will return "different" result - that seems to be a kind of behavior broken - the request having old version should be treated as "old" behavior
There was a problem hiding this comment.
It seems we try to keep the behavior only on version_0 [0]. After that, we just add more if-else to handle each timestamp flag without handling above issues. If that is the expected changes, maybe the jira is overkill, and we can close this PR to simplify broker's behavior.
@FrankYang0529 @junrao WDYT?
There was a problem hiding this comment.
Hi @chia7712, thanks for the great questions! First, we should have consensus about "how to handle a new defined behavior which was undefined in before"? So we can know how to answer further questions.
IIRC, for negative timestamp, we don't support it in Kafka. I found KIP-228 about it, but it looks like it's still under discussion. IMHO, if a behavior is unexpected, we may need to add some limitations to it in a newer version, so we can minimize unexpected result.
There was a problem hiding this comment.
IIRC, for negative timestamp, we don't support it in Kafka. I found KIP-228 about it, but it looks like it's still under discussion. IMHO, if a behavior is unexpected, we may need to add some limitations to it in a newer version, so we can minimize unexpected result.
thanks for that sharing. That means we do NOT care the cases of using negative ts in record. Hence, we can add explicit behavior for each version, and "reject" the requests which using "unknown" negative ts. for example:
else if (partition.timestamp() < 0 && !timestampMinSupportedVersion.contains(partition.timestamp())) {
buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)
} @FrankYang0529 WDYT?
There was a problem hiding this comment.
Hi @FrankYang0529 and @chia7712,
In the ListOffsetsRequest, we have the forConsumer field that was recently updated to handle different timestamps. Perhaps we could define the behavior based on this logic.
public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean requireMaxTimestamp,
boolean requireEarliestLocalTimestamp,
boolean requireTieredStorageTimestamp) {
short minVersion = 0;
if (requireTieredStorageTimestamp)
minVersion = 9;
else if (requireEarliestLocalTimestamp)
minVersion = 8;
else if (requireMaxTimestamp)
minVersion = 7;
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
}WYDT
There was a problem hiding this comment.
Hi @chia7712 and @frankvicky, thanks for the information. Updated it. Could you take a look again when you have time? Thank you.
b678a18 to
ec26872
Compare
| val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) | ||
|
|
||
| try { | ||
| val offsets = replicaManager.legacyFetchOffsetsForTimestamp( |
There was a problem hiding this comment.
This is another interesting issue.
#11390 added the support of EARLIEST_LOCAL_TIMESTAMP to v0 path. That seems to make server-to-server replica sync path work (LeaderEndPoint#fetchEarliestLocalOffset). However, the LIST_OFFSET request sent by server will use version_0 only if the MV is smaller than 0.10.2. What is the scenario that we set MV be smaller than 0.10.2 - the cluster has some brokers running by distribution < 0.10.1
However, those older brokers can't ever handle EARLIEST_LOCAL_TIMESTAMP correctly as they have no idea of EARLIEST_LOCAL_TIMESTAMP.
IMHO, we should revert the support of EARLIEST_LOCAL_TIMESTAMP from v0 path. Also, LeaderEndPoint#fetchEarliestLocalOffset should require the min version >=8
@jolshan @FrankYang0529 @junrao WDYT?
There was a problem hiding this comment.
IMHO, we should revert the support of EARLIEST_LOCAL_TIMESTAMP from v0 path.
Sounds good to me.
LeaderEndPoint#fetchEarliestLocalOffset should require the min version >=8
LeaderEndPoint#fetchEarliestLocalOffset is only used in tier storage, which is released in 3.6.0. An MV of 3.6.0 implies that the ListOffset request will use version >=8. So, it seems this is already covered?
There was a problem hiding this comment.
LeaderEndPoint#fetchEarliestLocalOffset is only used in tier storage, which is released in 3.6.0. An MV of 3.6.0 implies that the ListOffset request will use version >=8. So, it seems this is already covered?
Still trying to understand everything here, but just because it was released 3.6, doesn't mean MV is 3.6 unless tiered storage requires that as minimum MV to enable? If so, then I agree this is covered, but maybe should include a comment so it is not used for other things.
There was a problem hiding this comment.
I do see this (3.5-IV0 marked as the MV for tiered storage it seems), but not sure if there is other gating to enable tiered storage:
There was a problem hiding this comment.
Still trying to understand everything here, but just because it was released 3.6, doesn't mean MV is 3.6 unless tiered storage requires that as minimum MV to enable? If so, then I agree this is covered, but maybe should include a comment so it is not used for other things.
I agree to @jolshan. Also, we can't guarantee the method is used by tiered storage always. At least, we should add comments to that method to reminder us to use it carefully.
3.5-IV0 marked as the MV for tiered storage it seems
There was a problem hiding this comment.
Thanks @chia7712 for the summary. I'll go with (1) or (3).
There was a problem hiding this comment.
1 as implemented in this PR doesn't do what's expected, right? See #16873 (comment)
There was a problem hiding this comment.
1 as implemented in this PR doesn't do what's expected
I assumed the check happens in creating the LIST_OFFSET/FETCH request. for example:
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val metadataVersion = metadataVersionSupplier()
if (!metadataVersion.isEarliestLocalOffsetSupported)
throw new UnsupportedVersionException("Metadata Version is unmatched. It requires 3.5 but current is " + metadataVersion)
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
}There was a problem hiding this comment.
Dear all, the validation to MV is not regression and we don't reach consensus before 3.9 code freeze. IMHO, we should move the validation discussion to https://issues.apache.org/jira/browse/KAFKA-17405 and than @FrankYang0529 can focus on the validation of LIST_OFFSET which returns unsupported version if the timestamp flag is used by the incorrect version
ec26872 to
40d6eab
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. Left a couple of more comments. Also, in the future, it would be useful to preserve the commit history when pushing new changes. This makes it easier for incremental reviewing.
| } | ||
|
|
||
| public static Builder forReplica(short allowedVersion, int replicaId, boolean requireEarliestLocalTimestamp) { | ||
| short minVersion = requireEarliestLocalTimestamp ? (short) 8 : (short) 0; |
There was a problem hiding this comment.
I understand the intention of this logic, but it doesn't seem to do what you want. The problem is the following. The client determines the version for a request using the following logic in NetworkClient.
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
As you can see, builder.oldestAllowedVersion() is only used for determining the request version when versionInfo is not null. However, in BrokerBlockingSender, NetworkClient is created with discoverBrokerVersions=false, which means in NetworkClient, apiVersions is not populated and versionInfo is always null.
To me, if we can gate tier storage with MV, gating ListOffset here is less important.
There was a problem hiding this comment.
Hi @junrao, thanks for your comment! Although we don't do the check in broker sender side, the receiver does the check.
kafka/core/src/main/scala/kafka/network/SocketServer.scala
Lines 1121 to 1128 in 62dc982
There was a problem hiding this comment.
@FrankYang0529 : Yes, the receiver does have the check. I am just saying there is no need to add the code here since it's intended for the sender, but the sender doesn't execute the added logic.
| val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) | ||
|
|
||
| try { | ||
| val offsets = replicaManager.legacyFetchOffsetsForTimestamp( |
There was a problem hiding this comment.
Regarding gating tier storage on MV 3.6, we could potentially pass in a remoteLogManagerSupplier instead of a direct remoteLogManager to ReplicaManager. The supplier will instantiate remoteLogManager on first usage based on the remote storage config and the MV setting at that time. The MV initialization happens before the opening of socket server. So, by the time the supplier is called, we can be sure that MV has been initialized. This still doesn't support enabling remote storage dynamically, but is probably good enough in 3.9.
40d6eab to
0dc7739
Compare
chia7712
left a comment
There was a problem hiding this comment.
@FrankYang0529 thanks for updated PR
| return this.featureLevel > 0; | ||
| } | ||
|
|
||
| public boolean isEarliestLocalOffsetSupported() { |
|
|
||
| @ParameterizedTest | ||
| @EnumSource(value = MetadataVersion.class) | ||
| public void testIsEarliestLocalOffsetSupported(MetadataVersion metadataVersion) { |
|
|
||
| override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { | ||
| val metadataVersion = metadataVersionSupplier() | ||
| if (!metadataVersion.isEarliestLocalOffsetSupported) |
| maxNumOffsets = partition.maxNumOffsets, | ||
| isFromConsumer = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID, | ||
| fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID) | ||
| if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) { |
There was a problem hiding this comment.
Hi @chia7712, thanks for review. I addressed all comments.
0dc7739 to
8c6eea5
Compare
|
I will merge this PR to trunk and 3.9 tomorrow if no objection |
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. Just a minor comment.
| maxNumOffsets = partition.maxNumOffsets, | ||
| isFromConsumer = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID, | ||
| fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID) | ||
| if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) { |
There was a problem hiding this comment.
Should we remove the code related to EARLIEST_LOCAL_TIMESTAMP in UnifiedLog.legacyFetchOffsetsBefore?
There was a problem hiding this comment.
Hi @junrao, thanks for the suggestion. I remove EARLIEST_LOCAL_TIMESTAMP case in UnifiedLog.legacyFetchOffsetsBefore.
…OT support EarliestLocalSpec and LatestTieredSpec Signed-off-by: PoAn Yang <payang@apache.org>
8c6eea5 to
9138d9c
Compare
|
I run all core e2e, and the flaky/failure are shown below.
BTW, they are flaky/failed on trunk, so I feel they are unrelated to this PR. |
…T support EarliestLocalSpec and LatestTieredSpec (#16873) Add the version check to server side for the specific timestamp: - the version must be >=8 if timestamp=-4L - the version must be >=9 if timestamp=-5L Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…T support EarliestLocalSpec and LatestTieredSpec (apache#16873) Add the version check to server side for the specific timestamp: - the version must be >=8 if timestamp=-4L - the version must be >=9 if timestamp=-5L Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add the version check to server side for the specific timestamp:
Committer Checklist (excluded from commit message)