Skip to content

KAFKA-8885: The Kafka Protocol should Support Optional Tagged Fields#7325

Merged
hachikuji merged 19 commits intoapache:trunkfrom
cmccabe:KIP-482-III
Oct 7, 2019
Merged

KAFKA-8885: The Kafka Protocol should Support Optional Tagged Fields#7325
hachikuji merged 19 commits intoapache:trunkfrom
cmccabe:KIP-482-III

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Sep 11, 2019

No description provided.

@cmccabe cmccabe mentioned this pull request Sep 11, 2019
@cmccabe cmccabe force-pushed the KIP-482-III branch 9 times, most recently from 0616758 to ef30342 Compare September 13, 2019 22:20
twmb added a commit to twmb/franz-go that referenced this pull request Sep 22, 2019
WithClientID was annoying by having to specify a string pointer; instead
this adds a second opt to nil the client id if necessary. Doing so is
likely highly uncommon.

Adds controlled shutdown v0 correct encoding; noticed when scanning
apache/kafka#7325.
Comment thread clients/src/main/resources/common/message/README.md
* "string": a string. Strings are serialized as a length followed by the
contents as UTF-8. The contents must be less than 64kb in size. In
non-flexible versions, the string length will always be 2 bytes. In flexible
versions, the length is a variable-length integer.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-482 indicates that flexible versions will actually use unsigned variable length numbers, offset by 1 to reserve 0 indicating null. If that's the case, this should change from variable-length integer, which implies the old zig-zag varint, to unsigned variable-length integer offset by 1, with 0 indicating null (or something like that).

Also, it might be worth mentioning that the numbers are always offset by one, even if the field is non-nullable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to split the documentation part off into a separate PR, since there are some tricky questions about what should go in what documentation section. Let's discuss it there.

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Sep 23, 2019

Hi all, I have split this PR up into several other PRs to be more reviewable, including #7372, #7344, #7340, and more to come. I'm going to leave this one up for now so that people can see the bigger context of some of the changes, however. Maybe I will rebase this one and use it for the last part once the other ones are in. Thanks.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @cmccabe. Overall, it looks good to me. I have left few minor comments.

Comment thread generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java Outdated
//
// Version 3 is the first flexible version.
"validVersions": "0-3",
"flexibleVersions": "3+",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving the comment here because I haven't found a better place. It would be great if you could add tests in RequestResponseTest#testSerialization to cover all the versions which have been bumped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. We don't generally do exhaustive testing of all versions in RequestResponseTest. There would be a lot of entries! But I agree we should add more stuff in RequestResponseTest. Let's think about it in a follow-on PR

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for additional tests. Testing every version is not crazy. We have been bitten a few times already due to untested version bumps.

Comment thread generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java Outdated
Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I took an initial pass and left some high level questions and a couple of nits.

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/ObjectSizeCache.java Outdated
Comment thread clients/src/main/resources/common/message/README.md
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/ObjectSizeCache.java Outdated
default List<RawTaggedField> readRawTaggedField(List<RawTaggedField> unknowns, int tag, int size) {
if (unknowns == null) {
unknowns = new ArrayList<>();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this to avoid the allocation of unknowns unless there is at least one unknown?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

@Test
public void testInvalidFieldName() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to indicate what is invalid about the name. Is it the underscore at the start?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some JavaDoc

" { \"name\": \"_badName\", \"type\": \"[]int32\", \"versions\": \"0+\" }",
" ]",
"}")), MessageSpec.class);
fail("Expected MessageDataGenerator constructor to fail");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using assertThrows in this and other tests added in this PR that validate that an exception is thrown.

* A compact array represents its length with a varint rather than a
* fixed-length field.
*/
public class CompactArrayOf extends DocumentedType {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also talked about arrays of primitives having a compact representation where tags are not needed per element. How would we describe such arrays, are they packed arrays versus compact arrays?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could be either ArrayOf and CompactArrayOf. We don't have a separate array type for arrays of objects vs. arrays of non-object types

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Oct 3, 2019

  • I have split the changes to message/README.md into a follow-on PR.

  • I rebased on trunk and fixed a few minor conflicts.

  • Split headerVersion into requestHeaderVersion and responseHeaderVersion.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 3, 2019

There are some compiler errors after the latest updates.

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Oct 3, 2019

The compiler errors should be fixed now.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looking good overall. I left a few comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit messy to support different value types in the same map. Are we saving that much by not having separate maps?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory overhead of having a separate map would be pretty large in the common case where objects are small.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any uses for this code in any of the generated classes. Do we have test cases which exercise this logic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test message file SimpleExampleMessage.json contains a tagged array, which will use this logic. I will add a test that uses that field.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe readUnknownTaggedField?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use vararg constructor. A couple below as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be nice to document the expected type of fields

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably use Collections.emptyList() here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that this field is mutable, and Collections.emptyList returns something immutable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nifty

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you have given any thought to limiting allocations like this. For example, in the case of the byte array, we may be able to validate the size using the available bytes in the request

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we're kind of goofy to allow arrays with 2**31 elements. There must be a reasonable maximum we could set lower than that. But there will probably be some compatibility implications to this, so it will take time to impose a reasonable limit now....

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but there are a few cases where we could use a null check of _taggedField instead of a version check. Might make the generated code a little more readable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a bit complex to change now since we're also filtering versions that aren't present at all and so on

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a bug in the handling of nullable arrays when the default is not null. For example, consider the following field:

    { "name": "field2", "type": "[]BlahType",
      "versions": "1+", "taggedVersions": "1+", "tag": 1,
      "nullableVersions": "1+",
      "fields": [
          { "name": "wootId", "versions": "1+", "type": "int32" },
      ]
    }

This results in the following code:

        if (_version >= 1) {
            if (!field2.isEmpty()) {
                if (field2 == null) {
                    _taggedFields.put(1, null);
                } else {
                    Struct[] _nestedObjects = new Struct[field2.size()];
                    int i = 0;
                    for (BlahType element : this.field2) {
                        _nestedObjects[i++] = element.toStruct(_version);
                    }
                    _taggedFields.put(1, _nestedObjects);
                }
            }
        }

The null check should come first. Seems like the default value optimization needs to take into account nullable values. The same bug affects size.

In general, we probably need more testing, especially for default value handling.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding this. It might be better to address it in a follow on, since the fix could get complicated. I'll push what I have for now.

* Fix code generation for tagged array fields

* Rename TestUUID to SimpleExampleMessage and add some tests for tagged fields there.

* Fix a bug in generating the code for tagged array fields
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Oct 4, 2019

  • Responded to all comments
  • I rebased on trunk to catch up with the KIP-511 changes.
  • Fixed the bugs in ApiMessageTypeTest and ApiMessageType.
  • Fixed some other miscellaneous issues with ApiVersionsRequest.

I filed some follow-on JIRAs:

  • KAFKA-8984: Improve tagged fields documentation
  • KAFKA-8985: Use flexibleVersions with LeaderAndIsr, and improve RequestResponseTest coverage
  • KAFKA-8986: Allow null as a valid default for tagged fields

@hachikuji
Copy link
Copy Markdown
Contributor

Still at least one test failure. This one is reproducible locally:

12:22:00 kafka.server.ApiVersionsRequestTest > testApiVersionsRequest STARTED
12:22:06 kafka.server.ApiVersionsRequestTest.testApiVersionsRequest failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.server.ApiVersionsRequestTest.testApiVersionsRequest.test.stdout
12:22:06 
12:22:06 kafka.server.ApiVersionsRequestTest > testApiVersionsRequest FAILED
12:22:06     java.lang.OutOfMemoryError: Java heap space
12:22:06         at org.apache.kafka.common.utils.ImplicitLinkedHashCollection.clear(ImplicitLinkedHashCollection.java:566)
12:22:06         at org.apache.kafka.common.utils.ImplicitLinkedHashCollection.<init>(ImplicitLinkedHashCollection.java:530)
12:22:06         at org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection.<init>(ImplicitLinkedHashMultiCollection.java:52)
12:22:06         at org.apache.kafka.common.message.ApiVersionsResponseData$ApiVersionsResponseKeyCollection.<init>(ApiVersionsResponseData.java:615)
12:22:06         at org.apache.kafka.common.message.ApiVersionsResponseData.read(ApiVersionsResponseData.java:137)
12:22:06         at org.apache.kafka.common.message.ApiVersionsResponseData.<init>(ApiVersionsResponseData.java:87)
12:22:06         at org.apache.kafka.common.requests.ApiVersionsResponse.parse(ApiVersionsResponse.java:88)
12:22:06         at kafka.server.ApiVersionsRequestTest.sendApiVersionsRequest(ApiVersionsRequestTest.scala:81)
12:22:06         at kafka.server.ApiVersionsRequestTest.testApiVersionsRequest(ApiVersionsRequestTest.scala:48)

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 6, 2019

retest this please

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 7, 2019

Failures are all flakes.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 7, 2019

retest this please

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 7, 2019

Tests passed locally:

BUILD SUCCESSFUL in 42m 58s
150 actionable tasks: 139 executed, 11 up-to-date

@hachikuji
Copy link
Copy Markdown
Contributor

I will go ahead and merge. The failing tests are known to be flaky prior to this patch.

@hachikuji hachikuji merged commit 0de61a4 into apache:trunk Oct 7, 2019
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Oct 7, 2019

Thanks, @ijuma and @hachikuji! And all the other reviewers who helped with this

@twmb
Copy link
Copy Markdown

twmb commented Oct 8, 2019

Edit: concern retracted; after consideration, I think that tags on every struct level is fine.

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Oct 9, 2019

Hi @twmb, thanks for looking at this. As you probably figured out (I see you edited your comment a bit), it's important to allow tagged fields to be added without a version bump. Otherwise we don't get a lot of the benefits of a flexible schema. This does require an extra byte per struct. There was a lot of discussion about this on the mailing list. The discussion period was actually much longer than the implementation period and definitely was not done at the last minute. I looked for alternate solutions that didn't require the extra byte, but they were all very awkward and complex.

To counteract the extra space taken, we implemented more efficient serialization for strings, bytes, and arrays. In the common case where these fields are small, we save between 1 and 3 bytes per object. So if the objects in your hypothetical array of objects contain any of these things, the overhead is already cancelled out.

I agree that it is annoying that an []int32 is now different from []MySingleMemberStruct which contains an int32 each. The issue here is that there are a bunch of places where we pass around a short list of int32s to represent nodes, and it seemed excessive to add a byte of overhead to each. Another issue is that there were optimizations for calculating the message size based on the fact that each entry was the same length. I wanted to keep those optimizations. So therefore, although it was a judgement call, I proposed a design where arrays of primitives could be serialized in the old, simpler way.

I hope this answers all the questions (and potential ones?) :) There is more discussion about this on the mailing list if you want to go in depth. I always appreciate feedback and I made a point of pulling in some Kafka client authors before this was finalized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants