KAFKA-14156: Built-in partitioner may create suboptimal batches#12570
KAFKA-14156: Built-in partitioner may create suboptimal batches#12570junrao merged 6 commits intoapache:trunkfrom
Conversation
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the PR. LGTM. Are the test failures related?
|
@junrao -- looked through all failed tests, don't seem to use this code path and re-ran them locally, all succeeded. |
|
@artemlivshits : Thanks for the reply. Could you share some perf results with this PR? It would be useful to test for both 0 and non-zero linger. |
There was a problem hiding this comment.
Here, we are specifically handling the case with lingerMs > 0. I am wondering if the same issue described in the jira could occur with lingerMs equals to 0. With lingerMs = 0, because of back pressure, the effective batch size is typically between 1 and batch.size. With the built-in partitioner, we could still have the issue with a large batch size followed by a small one.
There was a problem hiding this comment.
"Fractional" batches in built-in partitioner could happen with lingerMs=0 as well, and it was probably this consideration that led to the DefaultPartitioner implementation, which switches partitions on batch boundary. The problem with letting the batch fill based on back pressure (which is pretty much the only driver for batching when linger.ms=0) is that we're likely to form larger batches to a slower broker and exhibit the original problem described in KAFKA-10888. This change is carefully avoiding any signals that could be based on backpressure and lets the batch fill only if it's not ready because it doesn't meet readiness criteria itself (full or waited for linger.ms), e.g. we don't let batch fill if there are other ready batches in the queue -- that means the broker wasn't ready to take a ready batch, i.e. backpressure signal.
That said, we could introduce some heuristics, e.g. switch partition if the batch becomes ready and we are within 10% (configurable) of sticky limit, so that instead of starting a new batch in the partition that we switch away from soon, we would start a new batch in the new partition where it may have better opportunity to fill. This would reintroduce some of the KAFKA-10888 pattern, but hopefully the 10% would put a bound on that. The downside of this approach is that it would make the logic even more complex to model and configure, and it's a little research project in itself. I was hoping to get some data from real-life usage of built-in partitioner and see if we need to build the heuristics (as a new project) or the current logic provides good default behavior.
Now the built-in partitioner defers partition switch (while still accounting produced bytes) if there is no ready batch to send, thus avoiding switching partitions and creating fractional batches.
Don't change partitions when there is a partial batch at the end. This should prevent partitioner from intefering with batches, but may re-introduce some slow broker bias with linger.ms=0
33a396a to
8ad76ff
Compare
| private boolean allBatchesFull(Deque<ProducerBatch> deque) { | ||
| // Only the last batch may be incomplete, so we just check that. | ||
| ProducerBatch last = deque.peekLast(); | ||
| return last == null || last.isFull(); |
There was a problem hiding this comment.
That's the only functional change in this commit, the rest is renames, comment updates and unit test fixes.
There was a problem hiding this comment.
Would it be better to add the deque.size() > 1 condition back? Intuitively, this indicates back pressure and we should be allowed to switch away from this partition.
There was a problem hiding this comment.
Did a bit of experiments and discussed the results offline, looks like the condition as it is would be more backward compatible.
| callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); | ||
| assertEquals(partition2, partition.get()); | ||
| assertEquals(3, mockRandom.get()); | ||
| assertEquals(2, mockRandom.get()); |
There was a problem hiding this comment.
The partition switching logic is now changed to switch after the batch is full, so just producing the required amount is not enough.
There was a problem hiding this comment.
Hmm, in this case, after the append, the last batch is full, which enables switching. Why isn't the partition switched?
There was a problem hiding this comment.
What happens here is the following:
- First record (small) -- gets a new partition (because there was none)
- Second record (large) doesn't fit, so the first record forms a batch (but not enough to switch).
- Second record (large) creates a new batch, but it's not marked as full (disabling the switch).
- Third record arrives, doesn't fit into the batch, it's marked as full (completing the switch, that was disabled in step 3).
So effectively in the step 4 the switch happens before the records is added, rather than after.
There was a problem hiding this comment.
Thanks, Artem. Sorry, I still don't fully understand.
After step 3, it seems that we switched to partition 2 after the append since mockRandom is 2, right? That part makes sense to me.
In step 4, we append to a new batch in partition 2. After the append(), it seems that enableSwitch should be true since last.isFull() should be true. Then, in topicInfo.builtInPartitioner.updatePartitionInfo, producedBytes >= stickyBatchSize && enableSwitch should be true, which will trigger partition switching. I am wondering what's missing here?
There was a problem hiding this comment.
Switching the partition only when the batch is complete make it effectively switch before we create a new batch. In the test the step 2 seems to be an outlier, because the large record + small record seems to exceed the batch.size * 2, so the even though the switch is disabled at the end of step 2, it still happens. I've increased the batch.size in the test, now all tests switch partition before creating a batch.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. A few more comments.
| /** | ||
| * Check if all batches in the queue are full. | ||
| */ | ||
| private boolean allBatchesFull(Deque<ProducerBatch> deque) { |
There was a problem hiding this comment.
allBatchesFull => lastBatchFull ?
There was a problem hiding this comment.
I think lastBatchFull doesn't include the case of last == null. Alternatively, we could invert the function and name it hasIncompleteBatches.
| // unready batch after the batch that disabled partition switch becomes ready). | ||
| // As a result, with high latency.ms setting we end up switching partitions after producing | ||
| // between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary. | ||
| if (producedBytes >= stickyBatchSize * 2) |
There was a problem hiding this comment.
Is this unexpected? If so, should we log as warn?
There was a problem hiding this comment.
This could potentially happen if we have a mix of keyed and unkeyed messages.
| // As a result, with high latency.ms setting we end up switching partitions after producing | ||
| // between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary. | ||
| if (producedBytes >= stickyBatchSize * 2) | ||
| log.trace("Exceeded {} bytes, produced {} bytes, enable is {}", stickyBatchSize * 2, producedBytes, enableSwitch); |
There was a problem hiding this comment.
Could we rephrase this to be more readable? Sth like "Produced $producedBytes bytes, exceeding twice the batch size of $stickyBatchSize, with switching set to $enableSwitch".
| callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); | ||
| assertEquals(partition2, partition.get()); | ||
| assertEquals(3, mockRandom.get()); | ||
| assertEquals(2, mockRandom.get()); |
There was a problem hiding this comment.
Hmm, in this case, after the append, the last batch is full, which enables switching. Why isn't the partition switched?
| private boolean allBatchesFull(Deque<ProducerBatch> deque) { | ||
| // Only the last batch may be incomplete, so we just check that. | ||
| ProducerBatch last = deque.peekLast(); | ||
| return last == null || last.isFull(); |
There was a problem hiding this comment.
Would it be better to add the deque.size() > 1 condition back? Intuitively, this indicates back pressure and we should be allowed to switch away from this partition.
|
Just stumbled over this. |
|
Thanks for the feedback @Kaiserchen . Could you run your MM test by changing |
Address review feedback.
Fix the test to me more intuitive.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the updated PR. Still one more comment below.
| // as we switch after the "sticky" limit is exceeded. The partition is switched after | ||
| // we produce. | ||
| // as we try to switch after the "sticky" limit is exceeded. The switch is disabled | ||
| // because of incomplete batch. |
There was a problem hiding this comment.
I am trying to understand why the switch is disabled here. It seems that the large record won't fit in the current batch. So, we will put the large record in a new batch. This batch will then be full since the large record has more than batch.size bytes in it. This should allow the switch, right?
There was a problem hiding this comment.
Right now we check the isFull condition, which seems to be false if the batch is not closed and there is one record in the batch -- when a new batch is allocated it is allocated to accommodate the new record, so it can be larger than batch.size (if the first record is greater than batch.size) making the isFull check false. Here is the relevant code
Allocation:
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Check:
public boolean isFull() {
// note that the write limit is respected only after the first record is added which ensures we can always
// create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}
There was a problem hiding this comment.
Thanks for the explanation, Artem. This makes sense to me now.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits : Thanks for the explanation. Just one more minor comment.
| // as we switch after the "sticky" limit is exceeded. The partition is switched after | ||
| // we produce. | ||
| // as we try to switch after the "sticky" limit is exceeded. The switch is disabled | ||
| // because of incomplete batch. |
There was a problem hiding this comment.
Thanks for the explanation, Artem. This makes sense to me now.
| assertEquals(1, mockRandom.get()); | ||
|
|
||
| // Produce large record, we should switch to next partition. | ||
| // Produce large record, we switched to next partition by previous produce, but |
There was a problem hiding this comment.
To be precise, the previous produce didn't switch to the next partition. The produce of this record forces the closing of the current batch, which cause the switch to the next partition.
Fix comment in the unit test.
junrao
left a comment
There was a problem hiding this comment.
@artemlivshits: Thanks for the latest PR. LGTM. Waiting for the tests to pass.
|
Looked through the failed tests -- seem unrelated (also ran locally - pass). |
Now the built-in partitioner defers partition switch (while still accounting produced bytes) if there is no ready batch to send, thus avoiding switching partitions and creating fractional batches. Reviewers: Jun Rao <jun@confluent.io>
…eptember 2022) `Jenkinsfile` was the only conflict and we ignore the changes since they are not relevant to the Confluent build. * apache-github/3.3: (61 commits) KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (apache#12628) KAFKA-14243: Temporarily disable unsafe downgrade (apache#12664) KAFKA-14240; Validate kraft snapshot state on startup (apache#12653) KAFKA-14233: disable testReloadUpdatedFilesWithoutConfigChange first to fix the build (apache#12658) KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (apache#12655) KAFKA-14156: Built-in partitioner may create suboptimal batches (apache#12570) MINOR: Adds KRaft versions of most streams system tests (apache#12458) MINOR; Add missing li end tag (apache#12640) MINOR: Mention that kraft is production ready in upgrade notes (apache#12635) MINOR: Add upgrade note regarding the Strictly Uniform Sticky Partitioner (KIP-794) (apache#12630) KAFKA-14222; KRaft's memory pool should always allocate a buffer (apache#12625) KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (apache#12626) KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (apache#12603) KAFKA-14215; Ensure forwarded requests are applied to broker request quota (apache#12624) MINOR; Remove end html tag from upgrade (apache#12605) Remove the html end tag from upgrade.html KAFKA-14205; Document how to replace the disk for the KRaft Controller (apache#12597) KAFKA-14203 Disable snapshot generation on broker after metadata errors (apache#12596) KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc (apache#12617) KAFKA-14217: app-reset-tool.html should not show --zookeeper flag that no longer exists (apache#12618) ...
Now the built-in partitioner defers partition switch (while still
accounting produced bytes) if there is no ready batch to send, thus
avoiding switching partitions and creating fractional batches.
Committer Checklist (excluded from commit message)