KAFKA-9629 Use generated protocol for Fetch API#9008
Conversation
|
Thanks for the PR. Since this affects the fetch path, let's make sure we benchmark this. cc @lbradstreet |
|
I agree that some benchmarks would be useful. One of the key differences is how the |
|
I agree that it’d be great to have a benchmark on both the request and response side. |
| this.responseData = responseData; | ||
| this.throttleTimeMs = throttleTimeMs; | ||
| this.sessionId = sessionId; | ||
| this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); |
There was a problem hiding this comment.
Probably better to save for a follow-up, but potentially we can get rid of this conversion by using FetchablePartitionResponse directly in the broker.
| field.camelCaseName(), field.camelCaseName()); | ||
| } | ||
| } else if (field.type().isRecords()) { | ||
| // TODO is this valid for record instances? |
There was a problem hiding this comment.
I don't think FileRecords and MemoryRecords instances can be compared directly, if that's what the question is about.
There was a problem hiding this comment.
No I don't think they are designed to be compared. My main question was whether we can compare the same type (MemoryRecords to MemoryRecords). I think it should work in the case of Objects.equals since it first checks if the instances are the same. I don't think we have any use cases where we have equivalent instances of records that are actual separate objects.
I have a similar question about hashCode down below. Records doesn't implement either of these, but we have to include them for all fields in the generated message classes for completeness. I think it's probably fine.
@cmccabe, any insight here?
There was a problem hiding this comment.
The hashCode of MemoryRecords takes into account the buffer position, so it's kind of useless. FileRecords doesn't even define it. We should consider defining the hashCode and equals of Records to be identity based.
There was a problem hiding this comment.
@mumrah : equality for the generated messages should mean bytewise equality. So if two FetchResponseData instances contain the same data, they should be equal, even if one is using MemoryRecords and the other is using FileRecords. Same for hashCode, of course.
If it's too much trouble to change the Records class, you can just write a static utility method in MessageUtils and invoke it from the generated classes. I expect that we won't be doing this kind of comparison except in tests, so you don't need to optimize the method too much.
There was a problem hiding this comment.
That would mean loading data from disk to compute equals and hashCode for FileRecords. That's pretty unusual for such methods.
| Struct responseBodyStruct = toStruct(apiVersion); | ||
| // Generate the Sends for the response fields and records | ||
| ArrayDeque<Send> sends = new ArrayDeque<>(); | ||
| RecordsWriter writer = new RecordsWriter(dest, sends::add); |
There was a problem hiding this comment.
Pretty nice if this is all the manual code we need. If we wanted to go a little further, we could push toSend into the generated class as well. That will be necessary if we ever want to get of the current AbstractRequest and AbstractResponse types and replace them with the generated data classes (which was always the plan). However, I think this could be left for follow-up work.
| result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), | ||
| new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(), | ||
| fetchPartition.partitionMaxBytes(), leaderEpoch)); |
There was a problem hiding this comment.
@mumrah Have we considered dropping the PartitionData class entirely in favour of using FetchRequestData .FetchPartition directly in the broker? The main difference is that FetchPartition does not have an Optional for the leader epoch but returns the default value (-1) instead.
There was a problem hiding this comment.
Yes, I think it's a good idea. However, it would expand the scope of this change quite a bit. I'm working on some micro benchmarks now, and if we don't have any apparent regressions then I'll save this for a follow-on PR.
There was a problem hiding this comment.
As an aside, it would be awesome to add Optional support to the generated classes. We have had so many bugs which were caused by sentinel values sneaking into unexpected places.
There was a problem hiding this comment.
Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it. I think we should fix it sooner rather than later too.
There was a problem hiding this comment.
Yeah, Optional support would be awesome. I was actually thinking how to do it. I may give it a shot during the weekend ;)
There was a problem hiding this comment.
@hachikuji @mumrah @cmccabe I have put together a prototype to support java.util.Optional in the auto-generated classes. It a good draft at the moment but it is a good basis for discussions: #9085
|
Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and better isolate the serialization time. On On this branch: As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change. |
Nice improvement! Could you please rerun them both with |
| } | ||
|
|
||
| this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100); | ||
| this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion()); |
There was a problem hiding this comment.
Can we please have benchmarks for both forConsumer and forReplica fetch requests?
There was a problem hiding this comment.
Can you also try rerunning the benchmark with random topic names, e.g. UUID.randomUUID().toString() and compare it to the existing topic names? I think our hashCode implementation sucks and we are seeing a lot of collisions.
There was a problem hiding this comment.
Changing our hashCode method massively improves the benchmark times so I think the current benchmark results aren't really representative.
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
public int hashCode() {
if (hash != 0)
return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + partition;
- result = prime * result + Objects.hashCode(topic);
+ int result = Objects.hash(topic, partition);
this.hash = result;
return result;
}
Edit: it looks like the main difference here is ordering by topic and then partition which seems to avoid the collisions for this reasonably pathological case. Maybe we can just change the test case.
|
Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included. On this branch: On trunk: |
Nice, so roughly for the replica fetch: 57% reduction in CPU time. Alloc rate normalized comparison: 24.18% reduction in garbage generation. I think the garbage generation will massively improve once we can get rid of toPartitionDataMap later. |
|
|
||
| if (partition.preferredReadReplica.isPresent()) { | ||
| subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> { | ||
| if (partition.preferredReadReplica().isPresent()) { |
There was a problem hiding this comment.
nit: could probably change this to use ifPresent
| * classes, return `null`. | ||
| * @return | ||
| */ | ||
| default ApiMessage data() { |
There was a problem hiding this comment.
Is there an advantage to pulling this up? Seems like we still need to update a bunch more classes. Until we have all the protocols converted, it might be safer to find another approach.
There was a problem hiding this comment.
I have a PR that does need. I really need to get that over the line.
There was a problem hiding this comment.
Perhaps instead we could add this to a mixin type. Then if we find cases where getting accessing to the ApiMessage generally would be useful, we could just use instanceof checks. These would ultimately go away after the conversions are finished.
| } | ||
|
|
||
| @Override | ||
| public ByteBuffer serialize(RequestHeader header) { |
There was a problem hiding this comment.
Are we overriding this so that we save the conversion to Struct? As far as I can tell, there's nothing specific to FetchRequest below. I wonder if we can move this implementation to AbstractRequest.serialize so that we save the conversion to Struct for all APIs that have been converted?
There was a problem hiding this comment.
Indeed this is generic serialization code for the message classes. If we go with a mixin interface to indicate a class has been converted over to generated messages, we could also push this up to AbstractRequest. However, this might be better saved for a follow-on since we'll probably want to pick up additional changes from @ijuma's PR. Thoughts?
|
|
||
| public static FetchRequest parse(ByteBuffer buffer, short version) { | ||
| return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version); | ||
| ByteBufferAccessor accessor = new ByteBufferAccessor(buffer); |
There was a problem hiding this comment.
In the parsing logic, we still convert to struct first before calling AbstractRequest.parseRequest. I think we could bypass the Struct conversion by changing AbstractRequest.parseRequest to take the ByteBuffer instead of the Struct.
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) {Then in the fetch case, we could just call this method.
There was a problem hiding this comment.
I believe this is also addressed in @ijuma's PR.
| */ | ||
| public void flush() { | ||
| ByteBufferSend send = new ByteBufferSend(dest, | ||
| ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); |
There was a problem hiding this comment.
This creates a copy of the underlying bytes, can we avoid it?
There was a problem hiding this comment.
Yea, it's possible, but rather complicated I think. We would need to manage our own byte array and grow it on-demand (like what happens in ByteArrayOutputStream). Then we could use ByteBuffer#slice to pass views of this array to the ByteBufferSend objects. I don't think this current approach is any worse than before in terms of array allocations, so maybe we could save this for a future optimization?
There was a problem hiding this comment.
Would org.apache.kafka.common.utils.ByteBufferOutputStream be useful here?
There was a problem hiding this comment.
Looks like the expansion factor for ByteArrayOutputStream varies on the JDK version. In JDK 8 and 11 it's 2x, but in JDK 14 it just grows the buffer to the minimum needed size.
Our growth factor of 1.1 in ByteBufferOutputStream seems reasonable . Not to mention avoiding the final copy by using slice would be nice too.
There was a problem hiding this comment.
@mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int minGrowth = minCapacity - oldCapacity;
if (minGrowth > 0) {
buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
minGrowth, oldCapacity /* preferred growth */));
}The third parameter passed to newLength is the preferred growth, which is oldCapacity. That is, it doubles if it doesn't cause overflow. We should probably double for ByteBufferOutputStream too if we have no estimate of the expected size. 1.1 growth makes sense if we do have a reasonable estimate (which is the case in current usage, I believe, but perhaps not in this case).
There was a problem hiding this comment.
Looking a bit more, it seems like this will be mostly used by the data that precedes the actual records. Do we have a sense for what's the typical size for that? If we do, we can use that in the initial size and we can keep the 1.1 growth.
There was a problem hiding this comment.
Thanks for the explanation, @ijuma. I missed the semantics of newLength
After the initial few top-level fields, each partition will have something like 38 bytes preceding its records (at a minimum, aborted transactions could increase that). Maybe we could increase initial capacity to 64 bytes?
There was a problem hiding this comment.
I increased the initial buffer size to 64 and also added 2x growth factor for the buffer. It occurred to me the initial size only really helps for the first partition's header fields, but beyond that (since we are reusing/growing the same ByteBufferOutputStream) we don't know what we'll need. The JMH benchmark did confirm that 2x was more performant than 1.1x for FetchResponse.
Existing usages of ByteBufferOutputStream were not modified and still use 1.1x
|
Recent test failures are due to removal of the static |
|
retest this please |
|
Thanks for this, @mumrah. I took a look at the overall approach with the Do we need I do think It seems like Using What you really want is to get rid of the ByteBufferOutputStream and just manage the buffer yourself here. Then, when you need to enlarge, you can just copy the data that's live and not the old, already flushed data. The above could be done in a follow-on if you want. I don't think it should block the merge |
|
Thanks @cmccabe, great feedback. I've updated RecordsWriter to allocate a single ByteBuffer based on a pre-calculated length (total message size - all records size). This avoids the buffer resizing altogether. I like your suggestions for Writable#close and moving readRecords into ByteBufferAccessor. I'll save these for a follow-on |
|
Latest FetchResponse benchmark trunk this branch So a pretty good reduction, overall. |
| { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true, | ||
| "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, | ||
| { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false, | ||
| { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, |
There was a problem hiding this comment.
I guess the implicit expectation is that if the protocol does not support the read_committed isolation level, then it wouldn't have transactional data anyway, so reverting to read_uncommitted is safe. Can't find a fault with that.
There was a problem hiding this comment.
I changed this to make the JSON schema match what was previously in FetchRequest.java. During serialization, we would simply stick the isolation level in the Struct regardless of the api version:
struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());So even if we were writing out a v3 FetchRequest, whatever value we put here would be ignored and not sent out. There were also some unit tests that utilized this behavior.
Your assessment sounds correct though, so it probably doesn't matter whether it's ignorable or not.
| } | ||
|
|
||
| @Override | ||
| public ByteBuffer readByteBuffer(int length) { |
There was a problem hiding this comment.
More of a side question, but is this length guaranteed to be less than the buffer size? Wondering if it is worth adding range checking.
There was a problem hiding this comment.
This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g.
int len = _reader.readInt();
if (len > 0) {
this.someZeroCopyField = _reader.readByteBuffer(len);
}So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error.
| return new ProduceRequest(struct, apiVersion); | ||
| case FETCH: | ||
| return new FetchRequest(struct, apiVersion); | ||
| return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); |
There was a problem hiding this comment.
nit: any reason not to stick with the same constructor convention as the other requests?
There was a problem hiding this comment.
I just wanted to remove the Struct constructor of FetchRequest completely. Eventually, RequestContext#parseRequest(ByteBuffer) will stop using Structs and pass the message data classes to AbstractRequest#parseRequest (or similar).
| * classes, return `null`. | ||
| * @return | ||
| */ | ||
| default ApiMessage data() { |
|
|
||
| RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add); | ||
| data.write(writer, cache, apiVersion); | ||
| writer.flush(); |
There was a problem hiding this comment.
nit: not a big deal, but I feel like calling flush should really be the responsibility of write.
There was a problem hiding this comment.
Yea, I agree. @cmccabe had a suggestion about adding Writable#close which would achieve the same goal. I think this would be nice and clean things up a bit. I'll open a follow up PR for this
| ResponseHeaderData responseHeaderData = responseHeader.data(); | ||
|
|
||
| int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion()); | ||
| int bodySize = (int) sends.stream().mapToLong(Send::size).sum(); |
There was a problem hiding this comment.
Instead of the cast, could we add a validation check?
There was a problem hiding this comment.
Do you mean something like Math.toIntExact?
| { "name": "FirstOffset", "type": "int64", "versions": "4+", | ||
| "about": "The first offset in the aborted transaction." } | ||
| ]}, | ||
| { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true, |
There was a problem hiding this comment.
I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirecting.
There was a problem hiding this comment.
I see what you mean. If we have a bug that causes us to hit the preferred replica code for an older api version, we should fail to serialize the message rather than sending it to a client that doesn't understand follower redirection.
Good catch.
|
I ran the consumer perf test (at @hachikuji's suggestion) and took a profile. Throughput was around 500MB/s on trunk and on this branch Zoomed in a bit on the records part: This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code. |
|
What were the throughput numbers? I assume you meant the connsumer perf test, not console consumer. |
|
@ijuma you're right, i meant the consumer perf test. I updated my comment to clarify |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Great work on this patch!
|
retest this please |
2 similar comments
|
retest this please |
|
retest this please |
|
These test failures are known flaky tests which already have jira tickets |


This change makes use of the generated protocols for FetchRequest and FetchResponse. The main challenge here was how to allow the transferrable bytes of the record set to be directly sent to the outgoing response without copying into a buffer.
The proposed solution is similar to the existing multi-send object used in FetchResponse. However, a new writer class RecordsWriter was introduced to allow interleaving of ByteBufferSend (for headers and other non-record fields) along with RecordsSend-s which implement the efficient byte transfer.
Another change introduced here is that FetchRequest and FetchResponse do not maintain their own copies of the fields from the message. Instead, they hold a reference to the generated message class (FetchRequestData and FetchResponseData). Read-only copies of different forms of the message data are created once open construction to allow for efficient access using the existing class methods.
For example, in FetchRequest we hold the FetchRequestData, but also compute and hold:
And in FetchResponse, we similarly hold:
If we want, we could deprecate all the accessors on FetchRequest/FetchResponse and force callers to use the
#data()method. This would eliminate the need for these additional data structures.Finally, most of the other changes are fixing up tests that were actually using invalid default values for protocol messages (which are now enforced, thanks to the generated classes) as well as rectifying the JSON schema to match what the actual defined
Schemas were (e.g., FETCH_RESPONSE_V11)