KAFKA-15355: Message schema changes#14290
Conversation
0543605 to
0b93c37
Compare
|
@rondagostino could you have a look? |
There was a problem hiding this comment.
Because AbstractRequest.doParseRequest() and AbstractResponse.parseResponse() now exceed 150 lines.
780a836 to
a28ef86
Compare
rondagostino
left a comment
There was a problem hiding this comment.
Left a couple of comments, only got partway through.
There was a problem hiding this comment.
I see that without this we get this error when trying to process the json:
Exception in thread "main" java.lang.RuntimeException: Exception while processing src/main/resources/common/metadata/PartitionChangeRecord.json
at org.apache.kafka.message.MessageGenerator.processDirectories(MessageGenerator.java:248)
at org.apache.kafka.message.MessageGenerator.main(MessageGenerator.java:367)
Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.kafka.message.FieldSpec`, problem: Field Replicas specifies taggedVersions 0+, and versions 0. taggedVersions must be a subset of versions.
at [Source: (File); line: 36, column: 82] (through reference chain: org.apache.kafka.message.MessageSpec["fields"]->java.util.ArrayList[4])
However, if we try to specify it with "taggedVersions": "0", like this:
{ "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "0", "nullableVersions": "0+", "taggedVersions": "0", "tag": 2,
"about": "null if the replicas didn't change; the new replicas otherwise." },
We get a different error:
Exception in thread "main" java.lang.RuntimeException: Exception while processing src/main/resources/common/metadata/PartitionChangeRecord.json
at org.apache.kafka.message.MessageGenerator.processDirectories(MessageGenerator.java:248)
at org.apache.kafka.message.MessageGenerator.main(MessageGenerator.java:367)
Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.kafka.message.FieldSpec`, problem: Field Replicas specifies taggedVersions 0, which is not open-ended. taggedVersions must be either none, or an open-ended range (that ends with a plus sign).
at [Source: (File); line: 36, column: 82] (through reference chain: org.apache.kafka.message.MessageSpec["fields"]->java.util.ArrayList[4])
The processing code is assuming that a tagged field can never be removed. I do not understand why this constraint holds. @cmccabe could you comment?
There was a problem hiding this comment.
Tagged fields cannot be removed because reusing a tag ID would cause incompatibility. Eventually we should have a way of "burning" a tag, but we don't have it yet. So do not remove tagged fields for now.
There was a problem hiding this comment.
We are not removing the tagged field. "tag": 2 stays reserved to the Replicas field, which prevents it from being reused. We're only closing the versions range.
Closing the version range has desirable consequences:
- removes the field from the generated schema
- prevents serializing the field outside the version range
- generates a runtime error if deserializing the field outside the version range
- marks deprecation of the field in the JSON definition
Even if a tagged field is no longer used we still need to keep the tag reserved to prevent reuse, so the rule "taggedVersions must be a subset of versions." enforced by the removed validation above doesn't seem to make sense? Am I missing some downside to removing it?
There was a problem hiding this comment.
Yeah, I think that's correct, the .json file still contains the now-closed tagged field and its tag. I wonder if that is good enough to "burn" that tag. Seems like it should be, but I am not familiar enough with these details to know. @cmccabe any thoughts?
There was a problem hiding this comment.
Yes, this is a good point. I am OK with changing the code so that tagged fields can have an "end version". It would be good to know that all v1 PartitionChangeRecords must specify a storage directory array if they specify a replica array.
However, that isn't quite what the code above is enforcing ... the code above is enforcing that "taggedVersions must be a subset of versions" which does seem like an invariant we still want. I think the code that throws an exception if taggedVersions has an end version is somewhere else...
There was a problem hiding this comment.
It would be good to know that all v1 PartitionChangeRecords must specify a storage directory array if they specify a replica array.
If the new field is specified — named Assignment ([]ReplicaAssignment), replacing Replicas ([]int32) — it must include Broker (int32) and Directory (uuid) which is not a tagged field, so it must be specified.
I think the code that throws an exception if taggedVersions has an end version is somewhere else...
It makes sense to keep that, and the check you are referring to is still there: https://github.com/apache/kafka/blob/trunk/generator/src/main/java/org/apache/kafka/message/FieldSpec.java#L155-L159
the code above is enforcing that "taggedVersions must be a subset of versions" which does seem like an invariant we still want.
"taggedVersions must be a subset of versions" is essentially two checks:
- "lowest taggedVersion >= lowest version"; and
- "highest taggedVersion is <= highest version"
Check 2. is at odds with the requirement that taggedVersions must not have an end version. But we can keep the first check. Instead of removing the code above, we can change it so that we verify that the lowest tagged version is equal to or lower than the version e.g.
if (this.taggedVersions.lowest() > this.versions.highest()) {
throw new RuntimeException("Field " + name + " specifies taggedVersions " +
this.taggedVersions + ", and versions " + this.versions + ". " +
"The lowest taggedVersion must be lower or equal to the highest version.");
}WDYT?
There was a problem hiding this comment.
I've made the change above. PTAL
There was a problem hiding this comment.
It's best to figure out how to burn tagged fields in a separate change. I've reverted the changes to the generator and to the deprecation of the Replicas field so we can keep both of them in the new record versions for now.
There was a problem hiding this comment.
Okay, thank you, I was just about to put some of my questions down about the burning of tags feature when I saw this comment!
|
I am putting myself as a reviewer because I would like to keep up to date with these changes. I will aim to provide my review tomorrow! |
clolov
left a comment
There was a problem hiding this comment.
Apologies for the delay! Overall the change makes sense to me, I just have a couple of small questions, otherwise I am happy to approve
There was a problem hiding this comment.
Shouldn't this be 80 (due to line 1201)? Or the request and response tests are not connected with each another?
There was a problem hiding this comment.
It doesn't really matter. This test suite doesn't test the semantics of the handling logic for each RPC, rather it focuses on aspects around serialization/deserialization. Wrt these changes, the only relevant test is testSerialization.
There was a problem hiding this comment.
The KIP doesn't mention this to be a tagged field - I am happy to update the KIP for consistency
There was a problem hiding this comment.
This is intentional. For brevity, the KIP describes the same change for both PartitionRecord and PartitionChangeRecord. With basic context of what each of the records are used for, I expect it to be implied that in PartitionChangeRecord, like every other field, Assignment is an optional change to make to a partition record.
There was a problem hiding this comment.
Got it, okay, this makes sense
There was a problem hiding this comment.
For my understanding, is there a benefit to updating the API version when you are adding a tagged (optional) field only?
There was a problem hiding this comment.
I guess this is more confusing now after the discussion around whether to burn the tag for Replicas or not. So this is a good question.
KIP-482 does say part of the motivation for tagged fields is to add them without bumping the record version:
Finally, sometimes, we want to add extra information to a message without requiring a version bump for everyone who has to read that message. This is particularly important for metadata like consumer offsets.
I'd rather we don't have two different definitions for same record version. That can lead to unnecessary confusion. It also feels weird to bump the version for PartitionRecord but not for PartitionChangeRecord. I also don't see any downside with bumping the version.
I wonder what @cmccabe 's take is on this.
There was a problem hiding this comment.
I thought about this over the weekend and I am happy to approve the pull request in its current state and then circle back to this at a later point in time. I think that it won't break anything whichever way you decide to go forward.
|
Thanks for the review @clolov |
21c3386 to
207f229
Compare
rondagostino
left a comment
There was a problem hiding this comment.
Spotted a few things. Will review again shortly.
| { "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "2+", | ||
| "about": "The replica assignment for this partition, sorted by preferred order.", "fields": [ | ||
| { "name": "Broker", "type": "int32", "versions": "2+", "entityType": "brokerId", | ||
| "about": "The broker ID hosting the replica." }, | ||
| { "name": "Directory", "type": "uuid", "versions": "2+", | ||
| "about": "The log directory hosting the replica" } | ||
| ]} |
There was a problem hiding this comment.
The Assignment field added to PartitionChangeRecord is added as a tagged field, yet the Assignment field added here in PartitionRecord is not. I think they should both be tagged fields?
There was a problem hiding this comment.
Why should PartitionRecord.Assignment be a tagged field?
All fields in PartitionChangeRecord are tagged (except for topic and partition ID). It makes sense that a change record have pretty much all fields be optional, and Assignment - like Replicas - is no different.
A PartitionRecord however, without Assignment makes less sense to me. PartitionRecord.Assignment should replace PartitionRecord.Replicas which is not a tagged field. Any partition should have a set of replicas, now they need a different structure, but the list should still be there.
There was a problem hiding this comment.
Fair. I'm wondering, though, if maybe we want to abandon this Assignment of type []ReplicaAssignment in favor of just an array of UUIDs. @cmccabe had originally expressed a preference for that in the VOTE thread:
I would prefer a new (tagged) array for replica UUIDs,
rather than creating the ReplicaAssignment array.
I had disagreed, and you had concurred with me, but that was based on an assumption that no longer holds since we now always specify the log directory UUID regardless of whether multiple log directories are in use or not:
Colin had mentioned that in PartitionRecord he would prefer a new
(tagged) array for replica UUIDs, rather than creating the
ReplicaAssignment array. While adding to an RPC is arguably less
intrusive than replacing, I am inclined to disagree with Colin's
suggestion for the following reason. We have agreed to not specify
the log dir uuid in the case where a replica only has one registered
log dir. If we were to add a new tagged array, it would become
necessary to specify all replica log dir uuids for all replicas in the
PartitionRecord if any one such replica had more than one log dir
configured. By creating the ReplicaAssignment array we can just
specify the uuid -- or not -- for each replica based on whether that
replica itself has multiple registered log dirs or not.
...
It is a bit of a pain to replace the field, but I agree that is
the best approach for the same reason you pointed out.
The ELR fields were just added as tagged fields on both records.
I also feel a bit uncomfortable with the Replicas field existing when it would not be used. Granted that it isn't used very much -- just 1 place -- but still, I worry about forcing any future uses to check to see if there are directory assignments and grab the replicas from there if they exist, otherwise grab them from the old location.
I thought perhaps we had discussed this in our Zoom meeting last week, but I could be wrong.
WDYT?
There was a problem hiding this comment.
It's a definitely a good shout – some of the assumptions have changed since then. I agree it's attractive to add a new orthogonal dir UUIDs field, that'll be an easier change to handle. And I'm also uncomfortable with not taking any action to deprecate the existing Replicas field.
My concerns are:
- It will be possible to construct a record that is semantically invalid, with a different number of entries in
record.replicasandrecord.logDirs. - We'll need extra care and it might be a bit awkward to keep the order of both lists in sync, any failures there will lead to issues that might be hard to detect or diagnose, as there won't be any hard link between both values.
- This problem stems mainly from our recent decision to postpone figuring out how to burn field tags. My understanding is that keeping the Replicas field unchanged is a temporary measure until then. Whereas in the long term every replica should always have log dir. In other words, I don't think we'd consider this if
Replicasdidn't already exist, so it feels like we're sacrificing the end design due to the existing limitation on tagged fields.
There was a problem hiding this comment.
I would say that this is all accurate, yes.
rondagostino
left a comment
There was a problem hiding this comment.
Thank for the continued discussion and adjustments. We're close. I think coming to consensus on whether to actually keep the []ReplicaAssignment field or just use a []uuid instead is the only real point of discussion remaining.
| { "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "2+", | ||
| "about": "The replica assignment for this partition, sorted by preferred order.", "fields": [ | ||
| { "name": "Broker", "type": "int32", "versions": "2+", "entityType": "brokerId", | ||
| "about": "The broker ID hosting the replica." }, | ||
| { "name": "Directory", "type": "uuid", "versions": "2+", | ||
| "about": "The log directory hosting the replica" } | ||
| ]} |
There was a problem hiding this comment.
Fair. I'm wondering, though, if maybe we want to abandon this Assignment of type []ReplicaAssignment in favor of just an array of UUIDs. @cmccabe had originally expressed a preference for that in the VOTE thread:
I would prefer a new (tagged) array for replica UUIDs,
rather than creating the ReplicaAssignment array.
I had disagreed, and you had concurred with me, but that was based on an assumption that no longer holds since we now always specify the log directory UUID regardless of whether multiple log directories are in use or not:
Colin had mentioned that in PartitionRecord he would prefer a new
(tagged) array for replica UUIDs, rather than creating the
ReplicaAssignment array. While adding to an RPC is arguably less
intrusive than replacing, I am inclined to disagree with Colin's
suggestion for the following reason. We have agreed to not specify
the log dir uuid in the case where a replica only has one registered
log dir. If we were to add a new tagged array, it would become
necessary to specify all replica log dir uuids for all replicas in the
PartitionRecord if any one such replica had more than one log dir
configured. By creating the ReplicaAssignment array we can just
specify the uuid -- or not -- for each replica based on whether that
replica itself has multiple registered log dirs or not.
...
It is a bit of a pain to replace the field, but I agree that is
the best approach for the same reason you pointed out.
The ELR fields were just added as tagged fields on both records.
I also feel a bit uncomfortable with the Replicas field existing when it would not be used. Granted that it isn't used very much -- just 1 place -- but still, I worry about forcing any future uses to check to see if there are directory assignments and grab the replicas from there if they exist, otherwise grab them from the old location.
I thought perhaps we had discussed this in our Zoom meeting last week, but I could be wrong.
WDYT?
There was a problem hiding this comment.
The org.apache.kafka.metadata.PartitionRegistration class has a merge() method that will need some adjusting as well.
There was a problem hiding this comment.
I had addressed that and lots of other references to Replicas in #14516
Would it be preferable to join the two PRs in a single patch?
There was a problem hiding this comment.
Would it be preferable to join the two PRs in a single patch?
They are related for sure. I assume the changes specific to that other PR would be much smaller if we decided to abandon the Assignment of type []ReplicaAssignment in favor of just an array of UUIDs. Then the changes would just be specific to that which is required for JBOD as opposed to all the changes related to a wholesale replacement of Replicas []int32 with Assignment []ReplicaAssignment. My preference right now would be to abandon []ReplicaAssignment and include the then-smaller changes from that other PR here.
There was a problem hiding this comment.
This should be by topic ID, not name.
There was a problem hiding this comment.
Agreed, both here and in the response json. The KIP will also need an update since it indicates name in both places. We must have missed that on the discussion/vote threads.
There was a problem hiding this comment.
Correct, changed. I've also updated the KIP.
|
Thanks for the PR, @soarez ! Do you have revisions ready for this one or do you want one of us to jump in and fix these comments like we talked about earlier? I feel like it's really close now 👍 |
|
Thanks for the review @cmccabe. Sorry it took me a bit longer to update this.
I'm not sure I understand what you're proposing, but in general: any help to get this over the line is welcome and much appreciated. 👍 I've switched the partition record schema changes to include a new @rondagostino , @pprovenzano PTAL |
rondagostino
left a comment
There was a problem hiding this comment.
Thanks for the changes! Left a few comments/questions. We are so very close...
| * An exception that may indicate the client's metadata is out of date | ||
| */ | ||
| public abstract class InvalidMetadataException extends RetriableException { | ||
| public class InvalidMetadataException extends RetriableException { |
There was a problem hiding this comment.
Not sure why this is marked abstract. It's been like this practically since the inception of the project. I guess the intent was to never throw this specifically but to instead always throw a derived class. But then why are the constructors public instead of protected?
I see no harm in this change. The other possibility is to make the constructors protected, of course -- or to leave it alone, though that keeps the inconsistency.
Maybe @cmccabe has more historical context on this and can comment?
There was a problem hiding this comment.
I had made this change to deal with a partition record with different lengths for replicas and directories. I've now realized that check hadn't been pushed.
If we need to keep this abstract for some reason we can create a new Exception type instead.
There was a problem hiding this comment.
Hmm. It seems odd to make this non-abstract. Clearly the intention was to have a more specific class describe why the metadata was invalid. Why would we not follow this pattern here?
There was a problem hiding this comment.
I have no good explanation for this, just laziness on my part. I've now created a specific subclass.
| public static DirectoryId fromUuid(Uuid uuid) { | ||
| return new DirectoryId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); | ||
| } |
There was a problem hiding this comment.
Noting that while there is no direct test to ensure that the most/least significant bits are copied correctly, this is tested indirectly via existing tests.
There was a problem hiding this comment.
I've added a test to verify the reserved values, using MSB/LSB as somewhat less indirect test.
| for (int i = 0; i < newReplicas.size(); i++) { | ||
| int newReplica = newReplicas.get(i); |
There was a problem hiding this comment.
Best to do for (int newReplica: newReplicas) { in case newReplicas is something like a linked list that is O(n) for random access.
| * @throws IllegalArgumentException If currentReplicas and currentDirectories have different lengths, | ||
| * or if there are duplicate broker IDs in the replica lists | ||
| */ | ||
| public static List<Uuid> update(int[] currentReplicas, DirectoryId[] currentDirectories, List<Integer> newReplicas) { |
There was a problem hiding this comment.
Can we have a better name than update() since this method is returning something new as opposed to updating something? Maybe createDirectoriesFrom() or similar?
| private static Stream<Arguments> partitionChangeRecordVersions() { | ||
| return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version)); | ||
| return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1) | ||
| .filter(v -> v != PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION) // TODO test latest record version in KAFKA-15514 |
There was a problem hiding this comment.
I believe we are going to close the current PR associated with KAFKA-15514, #14516, which I assume means we should remove this filter and make the tests pass in this PR.
There was a problem hiding this comment.
Good catch, I forgot this. Removed and fixed failing tests.
| { "name": "Directories", "type": "[]uuid", "versions": "2+", | ||
| "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."} |
There was a problem hiding this comment.
Not sure, but wondering if this should be a tagged field like the ELR fields. I don't see any specific advantage to doing so -- we won't ever use this version unless all nodes in the cluster support it, in which case making it mandatory actually makes sense -- but figured I would mention it in case anybody had additional thoughts or context around the ELR choice to tag the new fields in version 1.
There was a problem hiding this comment.
I would prefer to make it mandatory to make it clear that it's always present in version 2.
| if (directories == null) { | ||
| directories = DirectoryId.unassignedArray(replicas.length); | ||
| } |
There was a problem hiding this comment.
This makes sense to me -- set them as unassigned when they aren't specified. Note that this could be the case due to the feature not being enabled.
However, I suspect we should we do the same thing in public PartitionRegistration(PartitionRecord record), otherwise sometimes we can have PartitionRegistration records with non-null but "unassigned" directory IDs and sometimes we can have null. It feels to me that it would be easier to reason about if we always have something there.
There was a problem hiding this comment.
Good point. I've moved this to the underlying constructor to deal with both cases.
| import java.util.Set; | ||
|
|
||
| public class DirectoryId { | ||
| public class DirectoryId extends Uuid { |
There was a problem hiding this comment.
I don't see why we should extend Uuid rather than just having a utility class that handles Uuids in the way that we want. Inheritance is almost always a mistake (unless it's of an interface)
This seems to add a lot of boilerplate to / from code compared with just using Uuid directly.
There was a problem hiding this comment.
I don't disagree with the general subclassing rule of thumb, but I can't see why it's bad on this case.
On the contrary, having a specific separate type helps enforce that the IDs are created from the DirectoryId class, not from the UUID class. This is useful because the names and reserved values are different. e.g. without a separate type, there are no compiler errors/warnings if:
Uuid.ZERO_UUIDis used to directly replaceDirectoryId.UNASSINGNEDcreating confusion with the nomenclatureUuid.randomUuid()is used to directly replaceDirectoryId.random(), silently ignoring the possibility that one of the reserved values may be incorrectly used.
I don't think this is a huge deal either way though, so I you still feel strongly about this I'm ready – albeit somewhat reluctantly I'll admit – to change this and follow your suggestion.
Alternatively we could also keep a separate type, but use composition instead of inheritance. That will require a bit more boilerplate, and AFAICT bring little extra value, but we'd still get the benefits of having a separate type.
There was a problem hiding this comment.
It's a Uuid in every single way other than having 100 reserved entries at the beginning. I don't think that is enough to motivate a new type. A new type would be inconsistent with how we do everything else (cluster IDs, incarnation IDs, topic IDs, etc.) We use Uuid for all of them and there has never been any confusion.
(Although cluster ID is kind of a miss because technically it's a string and doesn't have to be a Uuid! But it was created before my time :) )
Having to copy between types is messy and confusing. For example, we'd have to copy between types every time we serialize to a record or to an RPC. This also removes a lot of the advantages you are touting (like "can't use Uuid.randomUuid" since we have to deal with Uuids anyway.)
There was a problem hiding this comment.
we'd have to copy between types every time we serialize to a record or to an RPC. This also removes a lot of the advantages you are touting
No, I don't think so, because direct access to the metadata record fields is isolated to PartitionRegistration and PartitionChangeBuilder, so we'd get the benefits everywhere above that abstraction layer.
But I do agree with your point on inconsistency, so I've pushed 04a79cd.
rondagostino
left a comment
There was a problem hiding this comment.
LGTM! Just one nit, and we can discuss it when we meet and then either make the change or merge as-is.
Thanks for the excellent work and discussion!
| /** | ||
| * A record was encountered where the number of directories does not match the number of replicas. | ||
| */ | ||
| public class InvalidReplicaDirectoriesException extends InvalidMetadataException { |
There was a problem hiding this comment.
nit: constructors in InvalidMetadataException can be made protected to eliminate the confusion that we encountered previously in this PR
There was a problem hiding this comment.
Makes sense. I've changed this 👍
| } else if (directories != null && directories.length != replicas.length) { | ||
| throw new IllegalStateException("The lengths for replicas and directories do not match."); |
There was a problem hiding this comment.
Noting that we don't throw InvalidReplicaDirectoriesException because this is in the Builder as opposed to in an actual record.
|
LGTM too! |
|
Failing tests are unrelated. |
|
Should have included the below information as additional description in the commit to Schema changes in API messages and metadata records for KIP-858. |
| "listeners": ["controller"], | ||
| "name": "BrokerRegistrationRequest", | ||
| "validVersions": "0-2", | ||
| "validVersions": "0-3", |
There was a problem hiding this comment.
What's the reasoning behind doing two bumps of this API version within the same Kafka release? 3.6 is currently on "validVersions": "0-3", so it looks like Kafka 3.7.0 will introduce both versions 2 and 3 of this API.
There was a problem hiding this comment.
Hi @aiven-anton. The api protocol may change more than once within the same release. The RPC version is bumped for each change. Refer to MetadataVersion and its documentation.
Reviewers: Christo Lolov <lolovc@amazon.com>, Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
Reviewers: Christo Lolov <lolovc@amazon.com>, Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
| { "name": "WantShutDown", "type": "bool", "versions": "0+", | ||
| "about": "True if the broker wants to be shut down, false otherwise." } | ||
| "about": "True if the broker wants to be shut down, false otherwise." }, | ||
| { "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0", |
There was a problem hiding this comment.
Shall this "tag": "0" be "tag": 0?
Schema changes in API messages and metadata records for KIP-858.
https://issues.apache.org/jira/browse/KAFKA-15355
Committer Checklist (excluded from commit message)