Skip to content

KAFKA-19634: Formalize nullable and non-nullable type distinctions in protocol specification#20614

Merged
chia7712 merged 24 commits intoapache:trunkfrom
DL1231:KAFKA-19634
Nov 28, 2025
Merged

KAFKA-19634: Formalize nullable and non-nullable type distinctions in protocol specification#20614
chia7712 merged 24 commits intoapache:trunkfrom
DL1231:KAFKA-19634

Conversation

@DL1231
Copy link
Copy Markdown
Collaborator

@DL1231 DL1231 commented Sep 30, 2025

This patch introduces a clear separation between nullable and
non-nullable data structures. The key changes include:

  1. Differentiates between nullable and non-nullable versions of
    RECORDS, COMPACT_RECORDS, and Schema types.
  2. Adds explicit nullable type names for ArrayOf and CompactArrayOf.
  3. Introduces a new, concise syntax for representing types:
    • {} for struct, ?{} for nullable struct
    • [T] for array, ?[T] for nullable array
    • (T) for compact array, ?(T) for nullable compact array
  4. Declares shared schemas as non-nullable Schema by default. A field
    that references a shared schema and is nullable must be explicitly
    declared as a new NullableSchema(X).
  5. Add UTs to verify the consistency between schema and message
    serialization.

Reviewers: Jun Rao junrao@gmail.com, Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions Bot added triage PRs from the community clients small Small PRs labels Sep 30, 2025
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the PR. This is a bit more complicated than I originally thought. Left a few comments.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Outdated
@github-actions github-actions Bot removed the triage PRs from the community label Oct 1, 2025
@github-actions github-actions Bot added streams generator RPC and Record code generator and removed small Small PRs labels Oct 9, 2025
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java Outdated
Comment thread clients/src/main/resources/common/message/README.md
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

  1. Regarding the implementation of the nullable vs non-nullable types. We use 3 different approaches. (a) For bytes, we implement two independent classes BYTES and NULLABLE_BYTES. (b) For array, we use one class ArraryOf, which takes a nullable param. (c) For schema, we implement NULLABLE_SCHEMA as a subclass of SCHEMA. Is it possible to pick one approach to implement all nullable types in a consistent way? Perhaps (b) or (c) is a bit better since it allows more code sharing.

  2. In the generated html, could we introduce notations for 4 different types of arrays (nullable vs non-nullable, compact vs non-compact)?

  3. This is an existing issue and can probably be done in a separate PR. All static classes in Field except TaggedFieldsSection are not really being used. We can probably remove them.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java Outdated
@DL1231
Copy link
Copy Markdown
Collaborator Author

DL1231 commented Oct 22, 2025

@junrao : Thanks for your review.

  1. Is it possible to pick one approach to implement all nullable types in a consistent way?

I think (c) might be more suitable, as it not only allows for more code reuse but also enables better separation of logic between nullable and non-nullable types.
What do you think about addressing this issue in a separate PR? The changes required to modify the implementation of all nullable types might be a bit more involved.

  1. In the generated html, could we introduce notations for 4 different types of arrays (nullable vs non-nullable, compact vs non-compact)?

How about adding the array type after the []? For example:

ConsumerGroupHeartbeat Response (Version: 0) => throttle_time_ms error_code error_message member_id member_epoch heartbeat_interval_ms assignment _tagged_fields 
  throttle_time_ms => INT32
  error_code => INT16
  error_message => COMPACT_NULLABLE_STRING
  member_id => COMPACT_NULLABLE_STRING
  member_epoch => INT32
  heartbeat_interval_ms => INT32
  assignment => NULLABLE_STRUCT [topic_partitions]COMPACT_ARRAY _tagged_fields 
    topic_partitions => STRUCT topic_id [partitions]COMPACT_ARRAY _tagged_fields 
      topic_id => UUID
      partitions => INT32
  1. All static classes in Field except TaggedFieldsSection are not really being used. We can probably remove them.

Filed KAFKA-19822 to track this case.

Should we add STRUCT to the top level? I guess the top level struct can never be null.

I agree that we probably don't need to. As you rightly pointed out, an empty request or response serves no purpose.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 thanks for this patch

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java Outdated
Comment thread generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated. PR.

I think (c) might be more suitable, as it not only allows for more code reuse but also enables better separation of logic between nullable and non-nullable types.
What do you think about addressing this issue in a separate PR? The changes required to modify the implementation of all nullable types might be a bit more involved.

Sounds good.

topic_partitions => STRUCT topic_id [partitions]COMPACT_ARRAY _tagged_fields 

How about we use [T], [T]?, (T) and (T)? to represent array, nullable array, compacted array and nullable compacted array, respectively?

Also, could we add the STRUCT keyword to the top level schema in the generated html?

Finally, could you rebase the PR to pick up a fix for flaky test #20713?

@DL1231
Copy link
Copy Markdown
Collaborator Author

DL1231 commented Oct 25, 2025

@junrao : Thanks for your review.

Filed KAFKA-19833 to track this issue.

How about we use [T], [T]?, (T) and (T)? to represent array, nullable array, compacted array and nullable compacted array, respectively?
Also, could we add the STRUCT keyword to the top level schema in the generated html?

The generated HTML looks like this:

ConsumerGroupHeartbeat Response (Version: 0) => STRUCT throttle_time_ms error_code error_message member_id member_epoch heartbeat_interval_ms assignment 
  throttle_time_ms => INT32
  error_code => INT16
  error_message => COMPACT_NULLABLE_STRING
  member_id => COMPACT_NULLABLE_STRING
  member_epoch => INT32
  heartbeat_interval_ms => INT32
  assignment => NULLABLE_STRUCT (topic_partitions) 
    topic_partitions => STRUCT topic_id (partitions) 
      topic_id => UUID
      partitions => INT32

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

Comment thread generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java Outdated
for (Iterator<StructSpec> iter = structRegistry.commonStructs(); iter.hasNext(); ) {
StructSpec struct = iter.next();
generateSchemas(struct.name(), struct, message.struct().versions());
generateSchemas(struct.name(), struct, message.struct().versions(), Versions.NONE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit problematic. A shared schema could be used by multiple fields. Some of them can be nullable and some others can be non-nullable. Not sure what's the best approach to address this issue. One potential way is to only support Schema for now. The generated code already handles null just with Schema. So far, for non-generated code usage, it seems that there hasn't been a need for a nullable schema. So, we could punt on that until there is a need.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Filed KAFKA-19870 to track it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : This one is important. So, I think we need to get this part right in this PR, instead of a followup one.

Copy link
Copy Markdown
Collaborator Author

@DL1231 DL1231 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we could punt on that until there is a need.

Sorry, I misunderstood your point earlier. I will address this issue asap.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shared schema could be used by multiple fields. Some of them can be nullable and some others can be non-nullable.

@junrao Pardon me, I may be misunderstanding you comment, but IIRC, the common struct does not support nullable property. So using Version.None should be good in this case

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao If Y is nullable, then declare that field as new NullableSchema(X);
if Z is non-nullable, then reference X directly.
X, by default, should be declared as new Schema(). WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could reject the json file if the common struct is used in both nullable and non-nullable definition. I think this may be reasonable since it should not be “common” if it has different definitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could reject the json file if the common struct is used in both nullable and non-nullable definition. I think this may be reasonable since it should not be “common” if it has different definitions.

This seems arbitrary. If we allow a struct field to be null, it seems that we should allow it regardless of how the struct is defined.

If Y is nullable, then declare that field as new NullableSchema(X);

This feels awkward to me. The generated code explicitly generates code that handles nulls. So, NullableSchema(X) is unnecessary and will likely confuse people.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current method parameter uses Version.None by default, indicating that the common struct only supports Schema.
Should we keep the existing logic unchanged?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Y is nullable, then declare that field as new NullableSchema(X);
if Z is non-nullable, then reference X directly.
X, by default, should be declared as new Schema().

@DL1231 : Thinking a bit more. I feel the above solution that you proposed probably works the best. We will need to change the constructor of NullableSchema to take a Schema. We will use this approach for both shared and non-shared schema when generating the classes.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Added a couple of more comments.

for (Iterator<StructSpec> iter = structRegistry.commonStructs(); iter.hasNext(); ) {
StructSpec struct = iter.next();
generateSchemas(struct.name(), struct, message.struct().versions());
generateSchemas(struct.name(), struct, message.struct().versions(), Versions.NONE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Y is nullable, then declare that field as new NullableSchema(X);
if Z is non-nullable, then reference X directly.
X, by default, should be declared as new Schema().

@DL1231 : Thinking a bit more. I feel the above solution that you proposed probably works the best. We will need to change the constructor of NullableSchema to take a Schema. We will use this approach for both shared and non-shared schema when generating the classes.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java Outdated
@DL1231
Copy link
Copy Markdown
Collaborator Author

DL1231 commented Nov 19, 2025

@junrao Thanks for the review. I have updated the PR. The generated HTML looks like this:

ConsumerGroupHeartbeat Response (Version: 0) => { throttle_time_ms error_code error_message member_id member_epoch heartbeat_interval_ms assignment }
  throttle_time_ms => INT32
  error_code => INT16
  error_message => COMPACT_NULLABLE_STRING
  member_id => COMPACT_NULLABLE_STRING
  member_epoch => INT32
  heartbeat_interval_ms => INT32
  assignment => ?{ (topic_partitions) }
    topic_partitions => { topic_id (partitions) }
      topic_id => UUID
      partitions => INT32

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments. Also, could you summarize the changes to generated code and the html doc?

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java Outdated
Comment thread streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java Outdated
@DL1231 DL1231 requested a review from junrao November 24, 2025 11:45
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/types/NullableSchema.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. A few more comments.

RECORDS, COMPACT_RECORDS, new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING)};
RECORDS, COMPACT_RECORDS, NULLABLE_RECORDS, COMPACT_NULLABLE_RECORDS,
new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING), ArrayOf.nullable(STRING), CompactArrayOf.nullable(STRING),
new Schema(), new NullableSchema(new Schema())};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. For COMPACT_BYTES and COMPACT_NULLABLE_BYTES, could you add a space in front of "Then N bytes follow. ?

Also, for all Array types, could we add a period at the end of the documentation to be consistent?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks very much for your detailed and patient review. I have updated the PR—please take another look when you have time.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 : Thanks for the updated PR. LGTM. Since the PR has evolved quite a bit from the original goal, could you adjust the title of the jira/PR to reflect the actual changes?

@chia7712 : Do you want to take another look at the PR?

@chia7712
Copy link
Copy Markdown
Member

Do you want to take another look at the PR?

yes, will take a look later!

@DL1231 DL1231 changed the title KAFKA-19634: Document the encoding of nullable struct KAFKA-19634: Formalize nullable and non-nullable type distinctions in protocol specification Nov 28, 2025
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 thanks for this great patch. overall LGTM. I have just one small comment remaining


@Override
public String leftBracket() {
return "?{";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the Array types documentation include the symbol, should it also be included in the documentation for consistency?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for Schema

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I have updated the PR. PTAL

@chia7712
Copy link
Copy Markdown
Member

the flaky is already traced by https://issues.apache.org/jira/browse/KAFKA-18952

@chia7712 chia7712 merged commit 58d62d1 into apache:trunk Nov 28, 2025
22 of 24 checks passed
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…protocol specification (apache#20614)

This patch introduces a clear separation between nullable and
non-nullable data structures. The key changes include:

1. Differentiates between nullable and non-nullable versions of
`RECORDS`, `COMPACT_RECORDS`, and `Schema` types.
2. Adds explicit nullable type names for `ArrayOf` and `CompactArrayOf`.
3. Introduces a new, concise syntax for representing types:
   - `{}` for struct, `?{}` for nullable struct
   - `[T]` for array, `?[T]` for nullable array
   - `(T)` for compact array, `?(T)` for nullable compact array
4. Declares shared schemas as non-nullable `Schema` by default. A field
that references a shared schema and is nullable must be explicitly
declared as a new `NullableSchema(X)`.
5. Add UTs to verify the consistency between schema and message
serialization.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
shashankhs11 pushed a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…protocol specification (apache#20614)

This patch introduces a clear separation between nullable and
non-nullable data structures. The key changes include:

1. Differentiates between nullable and non-nullable versions of
`RECORDS`, `COMPACT_RECORDS`, and `Schema` types.
2. Adds explicit nullable type names for `ArrayOf` and `CompactArrayOf`.
3. Introduces a new, concise syntax for representing types:
   - `{}` for struct, `?{}` for nullable struct
   - `[T]` for array, `?[T]` for nullable array
   - `(T)` for compact array, `?(T)` for nullable compact array
4. Declares shared schemas as non-nullable `Schema` by default. A field
that references a shared schema and is nullable must be explicitly
declared as a new `NullableSchema(X)`.
5. Add UTs to verify the consistency between schema and message
serialization.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants