diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index 37ccd68d5d950..e5da39985aef6 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -390,7 +390,10 @@ public CompletableFuture closeAsync() { synchronized (this) { state.set(State.Closed); client.cleanupProducer(this); - pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.forEach(msg -> { + msg.cmd.release(); + msg.recycle(); + }); pendingMessages.clear(); } @@ -421,7 +424,10 @@ public CompletableFuture closeAsync() { synchronized (ProducerImpl.this) { log.info("[{}] [{}] Closed Producer", topic, producerName); state.set(State.Closed); - pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.forEach(msg -> { + msg.cmd.release(); + msg.recycle(); + }); pendingMessages.clear(); } @@ -935,6 +941,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { op.sequenceId, t); } ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); }); semaphore.release(releaseCount.get()); pendingMessages.clear();