Skip to content

KAFKA-19479: Add test to verify data-loss bug was fixed inside the producer#20254

Merged
mjsax merged 6 commits intoapache:trunkfrom
shashankhs11:KAFKA-19479
Nov 7, 2025
Merged

KAFKA-19479: Add test to verify data-loss bug was fixed inside the producer#20254
mjsax merged 6 commits intoapache:trunkfrom
shashankhs11:KAFKA-19479

Conversation

@shashankhs11
Copy link
Copy Markdown
Contributor

@shashankhs11 shashankhs11 commented Jul 29, 2025

Adds a new integration test to verify that we don't lose any data after
a MESSAGE_TOO_LARGE error on write.

Reviewers: Matthias J. Sax matthias@confluent.io

@github-actions github-actions Bot added triage PRs from the community streams tests Test fixes (including flaky tests) labels Jul 29, 2025
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 29, 2025

Thanks for the PR. -- I did check it out and run it locally, and believe it's actually a bug inside KafkaProducer.

Before Kafka Streams commits offsets, it does call producer.flush() but this call does not block as it should until all requests are sent, but it returns early. -- Expected behavior would be, that producer.flush() blocks, and eventual fails with a TimeoutException hitting delivery.timeout.ms.

The problem inside the producer seems to be, that it initializes a flush by bumping RecordAccumulator.flushesInProgress -- when the "message too large" error return, it tries to split the batch, and does make the original (too large) batch as "done" which decrements the flushesInProgress counter to zero (leading to the early return of flush() which believes the flush is completed), what should not happen, as we are still in the middle of the flush, as new (smaller) batches get enqueued.

There seems to be some other issue in splitting batches though, as there is repeated logs:

WARN [Producer clientId=app-shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLargewWTlaRhOS_68DNGthXrNsQ-d363a0a7-08d4-4f0d-b5f1-3347745661be-StreamThread-1-producer] Got error produce response in correlation id 47 on topic-partition output-shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLargewWTlaRhOS_68DNGthXrNsQ-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:667)

It seems, the producer is actually not able to split the batch (guess we hit broker side message.max.byte limit, default 1MB while producer config sets batch size to 32MB). However, the counter splitting and retrying (2147483647 attempts left) does not decrease (not sure if this is right or wrong -- it seems, when we split a batch, and retry, the new batch(es) get a fresh retry count, what might be correct, but also somehow seems off? -- The main issue I see is, that I assume we also reset the delivery.timeout.ms [needs to be confirmed] and thus the producer send might not time out at all, if the producer is not able to split the batch, but creates a new batch of the same size over and over again) -- This issue, to not being able to split a batch, is just a side issue (even if I am wondering why the producer keeps retrying, but not just gives up early if it cannot split a batch into smaller ones -- seem there is some verification logic missing? -- Or is this by design, and if yes, why?), as there could also be the case of a single large message that cannot be split.

I am not a producer expert, and thus, I am not 100% sure right now what the right fix should be -- it seems to be a more difficult fix. \cc @lianetm @kirktrue

@shashankhs11
Copy link
Copy Markdown
Contributor Author

Thank you so much for the detailed explanation @mjsax. I have not yet started my investigation into this, but i think this really helps me and gives me a solid starting point to trace the bug. Since, you've confirmed that this is indeed a bug, I would like to start by writing failing unit tests that are more targeted and commit within the same PR. I think this would make it easier for everyone to review and propose a fix/approach since your initial impression was that it seems to be a more difficult fix.

@lianetm
Copy link
Copy Markdown
Member

lianetm commented Jul 30, 2025

Hey all, interesting. I don't have too much context but it does look fishy when we tryAppendForSplit and always allow a first record (no matter the size it seems), here

// We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)

Definitely needs more investigation, maybe start by adding a test to check the retry behaviour around splitAndReenqueue for a large batch with a single record? I cannot find any test for it (here it's all about batches that can indeed be split

public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
)

Hope it helps, I will stay in the loop.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 30, 2025

Since, you've confirmed that this is indeed a bug, I would like to start by writing failing unit tests that are more targeted and commit within the same PR

Given that the issue is in the producer, it might be simpler/better to just try writing a more tailored unit test in a new PR? Ie, do couple of sends() which are supported to create a batch that is too large, and call producer.flush() which would not return but eventually fails with a delivery.timeout.ms error (this error might be returned in the producer.send(..., new Callback()).

Or you try to go even one level deeper, and try to unit test org.apache.kafka.clients.producer.internals.Sender in combination with the passed in RecordAccumulator.

@shashankhs11
Copy link
Copy Markdown
Contributor Author

shashankhs11 commented Jul 31, 2025

@mjsax @lianetm -- I created a new PR as suggested, to add the new unit tests .

Here is the new PR - #20285

Details about the tests are described in the new PR

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Aug 4, 2025

@mjsax : Hmm, the splitting code has some logic to chain the response futures. When we append a record to a new batch after a split, we create a new future for the record and chain it to the old future.

    private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
...

            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp,
                                                                   key == null ? -1 : key.remaining(),
                                                                   value == null ? -1 : value.remaining(),
                                                                   Time.SYSTEM);
            // Chain the future to the original thunk.
            thunk.future.chain(future);

FutureRecordMetadata.chain() then adds the new future as a dependency.

    void chain(FutureRecordMetadata futureRecordMetadata) {
        if (nextRecordMetadata == null)
            nextRecordMetadata = futureRecordMetadata;
        else
            nextRecordMetadata.chain(futureRecordMetadata);
    }

FutureRecordMetadata.get() will then wait until all chained response futures complete.

    public RecordMetadata get() throws InterruptedException, ExecutionException {
        this.result.await();
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get();
        return valueOrError();
    }

So, a pending flush() waiting for an original batch is not supposed to complete until all split batches have completed. Did you actually observe that flush() returns early?

@lianetm : Currently, KafkaProducer.doSend() only rejects a record if its size is > max.request.size. It's ok if a record has a size larger than batch.size. We will just create a larger new batch to accommodate for it. So, during split, it's reasonable to follow the same approach to allow the first record in a new batch.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Aug 5, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@shashankhs11
Copy link
Copy Markdown
Contributor Author

@lianetm @junrao @mjsax tagging as a reminder for follow-up. Since, this is confirmed to be a bug (potentially critical) and I believe may require more discussions within the community experts.

To make it easier for others to reproduce and debug, one option could be that I can temporarily disable the failing test and we can merge this PR, so that community members can pick it up more easily.

Please let me know if I can contribute in any way to this issue. If there's any way I can help and contribute to this issue, I would really love to help.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Sep 8, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Sep 28, 2025

Did you actually observe that flush() returns early?

@junrao Yes (as described above: #20254 (comment)), I did set a breakpoint somewhere in the producer to see what it's doing, and while it was retrying the send internally it did unblock flush() before the send was done, and producer.flush() call did return.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Sep 29, 2025

@mjsax

Expected behavior would be, that producer.flush() blocks, and eventual fails with a TimeoutException hitting delivery.timeout.ms.

If a record fails to be delivered after delivery.timeout.ms, Sender calls failBatch(), which eventually calls ProduceRequestResult.done(). This unblocks Producer.flush(), but it won't fail with a TimeoutException since it doesn't check the error in ProduceRequestResult. Does KStreams depend on Producer.flush() failing? It seems that KStreams needs to check the future returned in the send() call to know if the record is sent successfully.

Or is this by design, and if yes, why?), as there could also be the case of a single large message that cannot be split.

We have the following code in Sender. It only tries to split the batch if it has more than 1 record. So, the splitting shouldn't occur forever. It does have the issue that when a split batch is created, it gets a new createMs, which inadvently extends the timeout.

    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now, Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
        Errors error = response.error;

        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
                (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
            ...
            this.accumulator.splitAndReenqueue(batch);

I did set a breakpoint somewhere in the producer to see what it's doing, and while it was retrying the send internally it did unblock flush() before the send was done, and producer.flush() call did return.

Do you have a test that can reproduce this?

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Oct 2, 2025

@mjsax : Took a closer look. This is indeed a bug in the producer. ProducerBatch has a produceFuture of type ProduceRequestResult and a list of Thunk, one for each produced record. Each Thunk has a future of type FutureRecordMetadata. When we split a ProducerBatch, we have the logic to chain Thunk.future to the newly split batches. This makes sure that we don't unblock the producer.send() prematurely. But the problem is that flush() doesn't wait on Thunk.future. Instead, it waits on ProducerBatch.produceFuture. After splitting, we immediately call ProducerBatch.produceFuture.done(). This will unblock the flush() call prematurely since the splitted batches haven't been completed.

As for the fix, one way is to change the logic in RecordAccumulator.awaitFlushCompletion(). Instead of doing the following,

            for (ProduceRequestResult result : this.incomplete.requestResults())
                result.await();

we collect all thunks in each incomplete batch and wait on each thunk's FutureRecordMetadata. This way, the chaining logic for FutureRecordMetadata will kick in.

As for why the splitting loops forever, this is probably because of a recently fixed bug in https://github.com/apache/kafka/pull/20358/files.

@shashankhs11 : Would you be willing to submit a patch?

@shashankhs11
Copy link
Copy Markdown
Contributor Author

shashankhs11 commented Oct 2, 2025

Would you be willing to submit a patch?

@junrao Yes, I would love to. I will look into this and try to get a better understanding of what is required.
Clarifying question - Would we need a KIP for this? Or can a patch be submitted in the same PR?

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Oct 3, 2025

@shashankhs11 : Thanks. Since the fix doesn't change any public API, no KIP is needed.

@shashankhs11
Copy link
Copy Markdown
Contributor Author

@junrao patch is available for review.
This change seemed to have fixed the infinite retry behavior but the offsets were still being committed and I am not sure what could be the reason for that.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Oct 4, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

mjsax pushed a commit that referenced this pull request Oct 17, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - #20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
mjsax pushed a commit that referenced this pull request Oct 17, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - #20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
mjsax pushed a commit that referenced this pull request Oct 17, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - #20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Comment on lines +146 to +147
@Test
public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception {
Copy link
Copy Markdown
Contributor Author

@shashankhs11 shashankhs11 Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need the 2nd test?

Each test takes about 30-31s to run

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Having the negative test should be sufficient, as it's the interesting case which did not work. The positive case is implicitly covered by many other integration tests, so it seems unncessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, removed in bec0341

@shashankhs11
Copy link
Copy Markdown
Contributor Author

@mjsax I updated the test. Requesting to take a look

@github-actions github-actions Bot removed needs-attention triage PRs from the community labels Oct 19, 2025
@shashankhs11 shashankhs11 requested a review from mjsax October 21, 2025 18:18
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM.

@mjsax mjsax changed the title KAFKA-19479: at_least_once mode in Kafka Streams silently drops messages when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees KAFKA-19479: Add test to verify data-loss bug was fixed inside the producer Nov 6, 2025
@shashankhs11
Copy link
Copy Markdown
Contributor Author

CI check failed due to a failing test. Rebased with latest trunk.

@mjsax mjsax merged commit 57456b3 into apache:trunk Nov 7, 2025
20 checks passed
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 7, 2025

Thanks for the test! Merged to trunk.

eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - apache#20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…oducer (apache#20254)

Adds a new integration test to verify that we don't lose any data after
a MESSAGE_TOO_LARGE error on write.

Reviewers: Matthias J. Sax <matthias@confluent.io>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - apache#20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…oducer (apache#20254)

Adds a new integration test to verify that we don't lose any data after
a MESSAGE_TOO_LARGE error on write.

Reviewers: Matthias J. Sax <matthias@confluent.io>
shashankhs11 added a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285)

Bug Fix in Producer where flush() does not wait for a batch to complete after splitting.

Cf - apache#20254 (comment)
and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for
more details

Reviewers: Jun Rao <junrao@gmail.com>
shashankhs11 added a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…oducer (apache#20254)

Adds a new integration test to verify that we don't lose any data after
a MESSAGE_TOO_LARGE error on write.

Reviewers: Matthias J. Sax <matthias@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants