diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index a2363fd7fd..125b6ec062 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -231,9 +231,9 @@ public void complete() { } }); - // Create another reference to this.decodeResults so the lambda expression will capture this local reference - // because this.decodeResults will be reset to null after resultFuture is completed. - final ConcurrentLinkedQueue decodeResults = this.decodeResults; + // Create a copy of this.decodeResults so the lambda expression will capture the current state + // because this.decodeResults will cleared after resultFuture is completed. + final List decodeResults = new ArrayList<>(this.decodeResults); resultFuture.complete( new ResponseCallbackWrapper( new FetchResponse<>(