Skip to content

KAFKA-9703:Free up resources when splitting huge batches#8286

Closed
jiameixie wants to merge 1 commit intoapache:trunkfrom
jiameixie:outOfMemory
Closed

KAFKA-9703:Free up resources when splitting huge batches#8286
jiameixie wants to merge 1 commit intoapache:trunkfrom
jiameixie:outOfMemory

Conversation

@jiameixie
Copy link
Copy Markdown
Contributor

Method split takes up too many resources and might
cause outOfMemory error when the bigBatch is huge.
Call closeForRecordAppends() to free up resources
like compression buffers.

Change-Id: Iac6519fcc2e432330b8af2d9f68a8d4d4a07646b
Signed-off-by: Jiamei Xie jiamei.xie@arm.com

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch.closeForRecordAppends does NOT release the byte array hosted by MemoryRecordsBuilder

    public void closeForRecordAppends() {
        if (appendStream != CLOSED_STREAM) {
            try {
                appendStream.close();
            } catch (IOException e) {
                throw new KafkaException(e);
            } finally {
                appendStream = CLOSED_STREAM;
            }
        }
    }

appendStream is a wrap of bufferStream. Not sure whether calling batch.closeForRecordAppends() can resolve the OOM or not.

this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks for reporting the issue. But I'd like to understand this problem a bit more. Supposedly the split batches will be closed when they are drained out of the RecordAccumulator.
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L608

and ProducerBatch#close() calls ProducerBatch#closeForRecordAppends().

Do you have a heap dump when the OOM happens so we can take a further look?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin In my case, the OOM is caused by negative estimatedCompressionRatio. I have committed #8285 to fix it.

@jiameixie
Copy link
Copy Markdown
Contributor Author

In RecordAccumulator#tryAppend, it calls closeForRecordAppends() too. And its comment says that it free up resources like compression buffers. Maybe my commit message should be modified?

/**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}

@chia7712
Copy link
Copy Markdown
Member

@jiameixie pardon me. the comment was not clear :(

the closeForRecordAppends() method does release compression outputstream. However, the main buffer is still hosted by bufferStream. my Q was - Is the OOM caused by the unreleased stuff?

At any rate, exercising the closeForRecordAppends() here seems to be necessary in order to avoid potential resource leak.

@jiameixie
Copy link
Copy Markdown
Contributor Author

Yes, the main buffer is still hosted by bufferStream. closeForRecordAppends() will release off-heap memory if compression used.

@jiameixie
Copy link
Copy Markdown
Contributor Author

I got the following 3 suspect by Memory Anlyser.
The thread org.apache.kafka.common.utils.KafkaThread @ 0x65d3c7a40 kafka-producer-network-thread | producer-1 keeps local variables with total size 2,210,962,968 (34.41%) bytes.
The memory is accumulated in one instance of "java.lang.Object[]" loaded by "".
The stacktrace of this Thread is available. See stacktrace.

Keywords
java.lang.Object[]

20,119 instances of "com.github.luben.zstd.ZstdOutputStream", loaded by "sun.misc.Launcher$AppClassLoader @ 0x65d3c44b0" occupy 2,648,721,080 (41.22%) bytes. These instances are referenced from one instance of "java.lang.Object[]", loaded by ""

Keywords
java.lang.Object[]
sun.misc.Launcher$AppClassLoader @ 0x65d3c44b0
com.github.luben.zstd.ZstdOutputStream

20,118 instances of "org.apache.kafka.common.utils.ByteBufferOutputStream", loaded by "sun.misc.Launcher$AppClassLoader @ 0x65d3c44b0" occupy 1,320,223,632 (20.55%) bytes. These instances are referenced from one instance of "java.lang.Object[]", loaded by ""

Keywords
java.lang.Object[]
sun.misc.Launcher$AppClassLoader @ 0x65d3c44b0
org.apache.kafka.common.utils.ByteBufferOutputStream

@jiameixie
Copy link
Copy Markdown
Contributor Author

After adding closeForRecordAppends(), the number of suspect turned into 1. It can't avoid OOM completely but to decrease the total used memory. Before adding this: about 22G can be consumed. After adding this: about 7177M is used. The memory used is 622M without running Kafka. The heap size is 6G.

The thread org.apache.kafka.common.utils.KafkaThread @ 0x64095d878 kafka-producer-network-thread | producer-1 keeps local variables with total size 6,160,047,864 (96.16%) bytes.
The memory is accumulated in one instance of "java.lang.Object[]" loaded by "".
The stacktrace of this Thread is available. See stacktrace.

Keywords
java.lang.Object[]

Details »

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 17, 2020

ok to test

@ijuma ijuma requested a review from becketqin March 17, 2020 16:25
@chia7712
Copy link
Copy Markdown
Member

the flaky StateDirectoryTest.shouldReturnEmptyArrayIfListFilesReturnsNull is fixed by #8310

@chia7712
Copy link
Copy Markdown
Member

retest this please

@becketqin
Copy link
Copy Markdown
Contributor

@jiameixie Can you check my previous comment on the closure of the ProducerBatch and help me understand the problem better?

Supposedly the split batches will be closed when they are drained out of the RecordAccumulator.
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L608

and ProducerBatch#close() calls ProducerBatch#closeForRecordAppends().

@chia7712
Copy link
Copy Markdown
Member

@jiameixie the purpose of this PR is to release the memory early so as to avoid too many memory are allocated in splitting huge batches, right? As @becketqin mentioned, the buffers in batches are release eventually but the OOM you observed happens in splitting huge batches.

@jiameixie
Copy link
Copy Markdown
Contributor Author

@chia7712 Yes, it is exactly this PR's object. I also wonder if the batches can be directly added to ConcurrentMap<TopicPartition, Deque> batches ? So it can be sent to broker and the resource can be released earlier.

@becketqin
Copy link
Copy Markdown
Contributor

@jiameixie It might be better to close the batches for append right after the split. But I am not sure if this would solve the OOM you saw. The byte arrays in the ProducerBatch will remain there until they are successfully sent or failed. It sounds that the OOM you saw was caused by the negative compression rate estimation. Did I misunderstand something?

@jiameixie
Copy link
Copy Markdown
Contributor Author

jiameixie commented Mar 19, 2020

@becketqin You didn't misunderstand. When there are negative compression rate estimation, both heap and out of heap memory will be consumed a lot. And adding closeForRecordAppends() will decrease the out of heap occupied by it. If fixing the negative compression rate estimation issue, the OOM will not occur.

@becketqin
Copy link
Copy Markdown
Contributor

I am not quite sure about the following statement. The out of heap allocation will only be released after the split ProducerBatch is completed with either success or failure. So it seems not matter whether we close the split batches right after the splitting or when they are drained out of the record accumulator, right?

And adding loseForRecordAppends() will decrease the out of heap occupied by it.

@jiameixie
Copy link
Copy Markdown
Contributor Author

closeForRecordAppends() will release compression buffers. Sorry for my spell mistake.

@becketqin
Copy link
Copy Markdown
Contributor

@jiameixie Got it. The input and output buffer for compression is usually small compared with the split batches, but that is still an improvement.

Copy link
Copy Markdown
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiameixie Thanks for the patch. Just one minor comment then I think it is good to be merged.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably also want to close the last batch for append in line 279.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin My negligence! The last batch should also be closed. I have modified it.

Method split will take up a lot of memory when the bigBatch is huge and
compression is used. Call closeForRecordAppends() to free up resources
like compression buffers.

Change-Id: Iac6519fcc2e432330b8af2d9f68a8d4d4a07646b
Signed-off-by: Jiamei Xie <jiamei.xie@arm.com>
@jiameixie
Copy link
Copy Markdown
Contributor Author

@becketqin @chia7712 Could you please review it? Thanks.

@becketqin
Copy link
Copy Markdown
Contributor

@jiameixie Sorry for the late reply. The patch LGTM. I'll merge it today.

@becketqin becketqin closed this in c85fd07 Apr 16, 2020
@jiameixie jiameixie deleted the outOfMemory branch June 11, 2020 01:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants