From 4956e4bf7b03ba66cc4d160851858de4b8f26ff9 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 29 Dec 2016 11:56:44 -0800 Subject: [PATCH] recycle OpSendMsg after releasing payload --- .../com/yahoo/pulsar/client/impl/ProducerImpl.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index 37ccd68d5d950..e5da39985aef6 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -390,7 +390,10 @@ public CompletableFuture closeAsync() { synchronized (this) { state.set(State.Closed); client.cleanupProducer(this); - pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.forEach(msg -> { + msg.cmd.release(); + msg.recycle(); + }); pendingMessages.clear(); } @@ -421,7 +424,10 @@ public CompletableFuture closeAsync() { synchronized (ProducerImpl.this) { log.info("[{}] [{}] Closed Producer", topic, producerName); state.set(State.Closed); - pendingMessages.forEach(msg -> msg.cmd.release()); + pendingMessages.forEach(msg -> { + msg.cmd.release(); + msg.recycle(); + }); pendingMessages.clear(); } @@ -935,6 +941,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { op.sequenceId, t); } ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); }); semaphore.release(releaseCount.get()); pendingMessages.clear();