diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 7522f33962241..eee1a82651df6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -95,6 +95,35 @@ public void testProducerTimeoutMemoryRelease() throws Exception { } + @Test(timeOut = 10_000) + public void testProducerBatchSendTimeoutMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(2, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(true) + .batchingMaxPublishDelay(3000, TimeUnit.MILLISECONDS) + .batchingMaxBytes(12) + .create(); + this.stopBroker(); + try { + producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); + try { + producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.TimeoutException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerCloseMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 4d3c36de9081f..c48ad3ab331ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.ArgumentMatchers.any; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -25,21 +29,18 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Field; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Test(groups = "broker-impl") public class ProducerSemaphoreTest extends ProducerConsumerBase { @@ -241,4 +242,44 @@ public void testEnsureNotBlockOnThePendingQueue() throws Exception { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); Assert.assertFalse(producer.isErrorStat()); } + + @Test(timeOut = 10_000) + public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception { + final int pendingQueueSize = 10; + @Cleanup + ProducerImpl producer = + (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerSemaphoreRelease") + .sendTimeout(2, TimeUnit.SECONDS) + .maxPendingMessages(pendingQueueSize) + .enableBatching(true) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(15) + .create(); + this.stopBroker(); + try { + ProducerImpl spyProducer = Mockito.spy(producer); + // Make the pendingMessages not empty + spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); + spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); + + Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer"); + batchMessageContainerField.setAccessible(true); + BatchMessageContainerImpl batchMessageContainer = + (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer); + batchMessageContainer.setProducer(spyProducer); + Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer) + .encryptMessage(any(), any()); + + try { + spyProducer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.TimeoutException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9dd2e01c375fa..bf8fb97cb21db 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1787,8 +1787,10 @@ private void failPendingBatchMessages(PulsarClientException ex) { return; } final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch(); + final long currentBatchSize = batchMessageContainer.getCurrentBatchSize(); batchMessageContainer.discard(ex); semaphoreRelease(numMessagesInBatch); + client.getMemoryLimitController().releaseMemory(currentBatchSize); } @Override @@ -1830,10 +1832,7 @@ private void batchMessageAndSend() { for (OpSendMsg opSendMsg : opSendMsgs) { processOpSendMsg(opSendMsg); } - } catch (PulsarClientException e) { - semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); } catch (Throwable t) { - semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); } }