From a30967ec996e409c4e83d39cb698155caec2f43b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 11 Feb 2022 09:48:47 +0200 Subject: [PATCH] Fix Netty buffer / direct memory leak in MessageFetchContext - #1049 changed behavior, therefore this change is needed --- .../pulsar/handlers/kop/MessageFetchContext.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index a2363fd7fd..125b6ec062 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -231,9 +231,9 @@ public void complete() { } }); - // Create another reference to this.decodeResults so the lambda expression will capture this local reference - // because this.decodeResults will be reset to null after resultFuture is completed. - final ConcurrentLinkedQueue decodeResults = this.decodeResults; + // Create a copy of this.decodeResults so the lambda expression will capture the current state + // because this.decodeResults will cleared after resultFuture is completed. + final List decodeResults = new ArrayList<>(this.decodeResults); resultFuture.complete( new ResponseCallbackWrapper( new FetchResponse<>(