From b62d1f569789305f29ad13845474e98dd7780eab Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 15 Dec 2020 16:03:39 -0800 Subject: [PATCH] PIP-74: Pulsar client memory limit in producer --- .../pulsar/client/api/MemoryLimitTest.java | 166 ++++++++++++++++++ .../client/api/ProducerConsumerBase.java | 7 + .../pulsar/client/api/ClientBuilder.java | 15 ++ .../client/api/PulsarClientException.java | 31 ++++ .../apache/pulsar/client/api/SizeUnit.java | 48 +++++ .../pulsar/client/impl/ClientBuilderImpl.java | 7 + .../client/impl/MemoryLimitController.java | 81 +++++++++ .../pulsar/client/impl/MessageImpl.java | 10 +- .../pulsar/client/impl/ProducerImpl.java | 50 ++++-- .../pulsar/client/impl/PulsarClientImpl.java | 6 + .../impl/conf/ClientConfigurationData.java | 2 + .../impl/MemoryLimitControllerTest.java | 165 +++++++++++++++++ 12 files changed, 572 insertions(+), 16 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SizeUnit.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java new file mode 100644 index 0000000000000..6b108dc1dd4f3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.util.concurrent.CountDownLatch; + +import lombok.Cleanup; + +import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class MemoryLimitTest extends ProducerConsumerBase { + + @DataProvider(name = "batching") + public Object[][] provider() { + return new Object[][] { + // "Batching" + { false }, + { true }, + }; + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(dataProvider = "batching") + public void testRejectMessages(boolean enableBatch) + throws Exception { + String topic = newTopicName(); + + @Cleanup + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .memoryLimit(100, SizeUnit.KILO_BYTES) + .build(); + + @Cleanup + Producer producer = client.newProducer() + .topic(topic) + .blockIfQueueFull(false) + .create(); + + final int n = 101; + CountDownLatch latch = new CountDownLatch(n); + + for (int i = 0; i < n; i++) { + producer.sendAsync(new byte[1024]).thenRun(() -> { + latch.countDown(); + }); + } + + assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024); + + try { + producer.send(new byte[1024]); + fail("should have failed"); + } catch (MemoryBufferIsFullError e) { + // Expected + } + + latch.await(); + + assertEquals(client.getMemoryLimitController().currentUsage(), 0); + + // We should now be able to send again + producer.send(new byte[1024]); + } + + @Test(dataProvider = "batching") + public void testRejectMessagesOnMultipleTopics(boolean enableBatch) throws Exception { + String t1 = newTopicName(); + String t2 = newTopicName(); + + @Cleanup + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .memoryLimit(100, SizeUnit.KILO_BYTES) + .build(); + + @Cleanup + Producer p1 = client.newProducer() + .topic(t1) + .blockIfQueueFull(false) + .create(); + + @Cleanup + Producer p2 = client.newProducer() + .topic(t2) + .blockIfQueueFull(false) + .create(); + + final int n = 101; + CountDownLatch latch = new CountDownLatch(n); + + for (int i = 0; i < n / 2; i++) { + p1.sendAsync(new byte[1024]).thenRun(() -> { + latch.countDown(); + }); + p2.sendAsync(new byte[1024]).thenRun(() -> { + latch.countDown(); + }); + } + + // Last message in order to reach the limit + p1.sendAsync(new byte[1024]).thenRun(() -> { + latch.countDown(); + }); + + assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024); + + try { + p1.send(new byte[1024]); + fail("should have failed"); + } catch (MemoryBufferIsFullError e) { + // Expected + } + + try { + p2.send(new byte[1024]); + fail("should have failed"); + } catch (MemoryBufferIsFullError e) { + // Expected + } + + latch.await(); + + assertEquals(client.getMemoryLimitController().currentUsage(), 0); + + // We should now be able to send again + p1.send(new byte[1024]); + p2.send(new byte[1024]); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index 6e85b08c48d9f..8c819e92e34e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -21,6 +21,7 @@ import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.Random; import java.util.Set; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -61,4 +62,10 @@ protected void testMessageOrderAndDuplicates(Set messagesReceived, T rece Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); } + private static final Random random = new Random(); + + protected String newTopicName() { + return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong()); + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 290cbf0f5a01a..6f50678ead776 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -365,6 +365,21 @@ ClientBuilder authentication(String authPluginClassName, Map aut */ ClientBuilder tlsProtocols(Set tlsProtocols); + /** + * Configure a limit on the amount of direct memory that will be allocated by this client instance. + *

+ * Note: at this moment this is only limiting the memory for producers. + *

+ * Setting this to 0 will disable the limit. + * + * @param memoryLimit + * the limit + * @param unit + * the memory limit size unit + * @return the client builder instance + */ + ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit); + /** * Set the interval between each stat info (default: 60 seconds) Stats will be activated with positive * statsInterval It should be set to at least 1 second. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index a11966874cc32..1c71e01a8168d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -705,6 +705,35 @@ public ProducerQueueIsFullError(String msg, long sequenceId) { } } + /** + * Memory buffer full error thrown by Pulsar client. + */ + public static class MemoryBufferIsFullError extends PulsarClientException { + /** + * Constructs an {@code MemoryBufferIsFullError} with the specified detail message. + * + * @param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public MemoryBufferIsFullError(String msg) { + super(msg); + } + + /** + * Constructs an {@code MemoryBufferIsFullError} with the specified detail message. + * + * @param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * @param sequenceId + * The sequenceId of the message + */ + public MemoryBufferIsFullError(String msg, long sequenceId) { + super(msg, sequenceId); + } + } + /** * Producer blocked quota exceeded error thrown by Pulsar client. */ @@ -990,6 +1019,8 @@ public static PulsarClientException unwrap(Throwable t) { return new TopicDoesNotExistException(msg); } else if (cause instanceof ProducerFencedException) { return new ProducerFencedException(msg); + } else if (cause instanceof MemoryBufferIsFullError) { + return new MemoryBufferIsFullError(msg); } else { return new PulsarClientException(t); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SizeUnit.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SizeUnit.java new file mode 100644 index 0000000000000..c49e0d45b724f --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SizeUnit.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Size unit converter. + */ +public enum SizeUnit { + BYTES(1L), KILO_BYTES(1024L), MEGA_BYTES(1024L * 1024L), GIGA_BYTES(1024L * 1024L * 1024L); + + private final long bytes; + + private SizeUnit(long bytes) { + this.bytes = bytes; + } + + public long toBytes(long value) { + return value * bytes; + } + + public long toKiloBytes(long value) { + return toBytes(value) / KILO_BYTES.bytes; + } + + public long toMegaBytes(long value) { + return toBytes(value) / MEGA_BYTES.bytes; + } + + public long toGigaBytes(long value) { + return toBytes(value) / GIGA_BYTES.bytes; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index c6d57e23ae5c8..8f4ea66ea288c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; @@ -290,6 +291,12 @@ public ClientConfigurationData getClientConfigurationData() { return conf; } + @Override + public ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit) { + conf.setMemoryLimitBytes(unit.toBytes(memoryLimit)); + return this; + } + @Override public ClientBuilder clock(Clock clock) { conf.setClock(clock); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java new file mode 100644 index 0000000000000..8aa541744e316 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class MemoryLimitController { + + private final long memoryLimit; + private final AtomicLong currentUsage = new AtomicLong(); + private final ReentrantLock mutex = new ReentrantLock(false); + private final Condition condition = mutex.newCondition(); + + public MemoryLimitController(long memoryLimitBytes) { + this.memoryLimit = memoryLimitBytes; + } + + public boolean tryReserveMemory(long size) { + while (true) { + long current = currentUsage.get(); + long newUsage = current + size; + + // We allow one request to go over the limit, to make the notification + // path simpler and more efficient + if (current > memoryLimit && memoryLimit > 0) { + return false; + } + + if (currentUsage.compareAndSet(current, newUsage)) { + return true; + } + } + } + + public void reserveMemory(long size) throws InterruptedException { + while (!tryReserveMemory(size)) { + mutex.lock(); + try { + condition.await(); + } finally { + mutex.unlock(); + } + } + } + + public void releaseMemory(long size) { + long newUsage = currentUsage.addAndGet(-size); + if (newUsage + size > memoryLimit && + newUsage <= memoryLimit) { + // We just crossed the limit. Now we have more space + mutex.lock(); + try { + condition.signalAll(); + } finally { + mutex.unlock(); + } + } + } + + public long currentUsage() { + return currentUsage.get(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 9743ce5238635..5f4533277fdcd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -64,6 +64,7 @@ public class MessageImpl implements Message { private String topic; // only set for incoming messages transient private Map properties; private final int redeliveryCount; + private int uncompressedSize; private PulsarApi.BrokerEntryMetadata brokerEntryMetadata; @@ -78,6 +79,7 @@ public static MessageImpl create(MessageMetadata.Builder msgMetadataBuild msg.payload = Unpooled.wrappedBuffer(payload); msg.properties = null; msg.schema = schema; + msg.uncompressedSize = payload.remaining(); return msg; } @@ -424,7 +426,7 @@ public synchronized Map getProperties() { this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream() .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue,newValue) -> newValue))); - + } else { this.properties = Collections.emptyMap(); } @@ -556,6 +558,10 @@ public int getRedeliveryCount() { return redeliveryCount; } + int getUncompressedSize() { + return uncompressedSize; + } + SchemaState getSchemaState() { return schemaState; } @@ -564,6 +570,8 @@ void setSchemaState(SchemaState schemaState) { this.schemaState = schemaState; } + + enum SchemaState { None, Ready, Broken } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9ae41681c2d4e..8147d177f74c2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -365,16 +365,16 @@ public void sendAsync(Message message, SendCallback callback) { return; } - if (!canEnqueueRequest(callback, message.getSequenceId())) { - return; - } - - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = (MessageImpl) message; MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); + int uncompressedSize = payload.readableBytes(); + + if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) { + return; + } // If compression is enabled, we are compressing, otherwise it will simply use the same buffer - int uncompressedSize = payload.readableBytes(); ByteBuf compressedPayload = payload; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually @@ -392,7 +392,7 @@ public void sendAsync(Message message, SendCallback callback) { PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds %d bytes", producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize())); - completeCallbackAndReleaseSemaphore(callback, invalidMessageException); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); return; } } @@ -401,7 +401,7 @@ public void sendAsync(Message message, SendCallback callback) { PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s can not reuse the same message", producerName, topic), msg.getSequenceId()); - completeCallbackAndReleaseSemaphore(callback, invalidMessageException); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); compressedPayload.release(); return; } @@ -417,7 +417,7 @@ public void sendAsync(Message message, SendCallback callback) { + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1); // chunked message also sent individually so, try to acquire send-permits for (int i = 0; i < (totalChunks - 1); i++) { - if (!canEnqueueRequest(callback, message.getSequenceId())) { + if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) { return; } } @@ -442,9 +442,9 @@ public void sendAsync(Message message, SendCallback callback) { } } catch (PulsarClientException e) { e.setSequenceId(msg.getSequenceId()); - completeCallbackAndReleaseSemaphore(callback, e); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e); } catch (Throwable t) { - completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId())); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, new PulsarClientException(t, msg.getSequenceId())); } } @@ -564,7 +564,7 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic) , msg.getSequenceId()); - completeCallbackAndReleaseSemaphore(callback, e); + completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e); return false; } SchemaHash schemaHash = SchemaHash.of(msg.getSchema()); @@ -687,7 +687,7 @@ private boolean canAddToCurrentBatch(MessageImpl msg) { private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Closing out batch to accommodate large message with size {}", topic, producerName, - msg.getDataBuffer().readableBytes()); + msg.getUncompressedSize()); } try { batchMessageAndSend(); @@ -725,15 +725,22 @@ private boolean isValidProducerState(SendCallback callback, long sequenceId) { } } - private boolean canEnqueueRequest(SendCallback callback, long sequenceId) { + private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) { try { if (conf.isBlockIfQueueFull()) { semaphore.acquire(); + client.getMemoryLimitController().reserveMemory(payloadSize); } else { if (!semaphore.tryAcquire()) { callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId)); return false; } + + if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) { + semaphore.release(); + callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId)); + return false; + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -982,10 +989,12 @@ private long getHighestSequenceId(OpSendMsg op) { private void releaseSemaphoreForSendOp(OpSendMsg op) { semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); } - private void completeCallbackAndReleaseSemaphore(SendCallback callback, Exception exception) { + private void completeCallbackAndReleaseSemaphore(long payloadSize, SendCallback callback, Exception exception) { semaphore.release(); + client.getMemoryLimitController().releaseMemory(payloadSize); callback.sendComplete(exception); } @@ -1111,6 +1120,7 @@ protected static final class OpSendMsg { ByteBufPair cmd; SendCallback callback; Runnable rePopulate; + long uncompressedSize; long sequenceId; long createdAt; long batchSizeByte = 0; @@ -1126,6 +1136,7 @@ static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, Se op.callback = callback; op.sequenceId = sequenceId; op.createdAt = System.nanoTime(); + op.uncompressedSize = msg.getUncompressedSize(); return op; } @@ -1136,6 +1147,10 @@ static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenc op.callback = callback; op.sequenceId = sequenceId; op.createdAt = System.nanoTime(); + op.uncompressedSize = 0; + for (int i = 0; i < msgs.size(); i++) { + op.uncompressedSize += msgs.get(i).getUncompressedSize(); + } return op; } @@ -1148,6 +1163,10 @@ static OpSendMsg create(List> msgs, ByteBufPair cmd, long lowestS op.sequenceId = lowestSequenceId; op.highestSequenceId = highestSequenceId; op.createdAt = System.nanoTime(); + op.uncompressedSize = 0; + for (int i = 0; i < msgs.size(); i++) { + op.uncompressedSize += msgs.get(i).getUncompressedSize(); + } return op; } @@ -1162,6 +1181,7 @@ void recycle() { highestSequenceId = -1L; totalChunks = 0; chunkId = -1; + uncompressedSize = 0; recyclerHandle.recycle(this); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 149c617d3b8c5..9b87bfd25b2a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -110,6 +110,7 @@ public enum State { private final AtomicLong requestIdGenerator = new AtomicLong(); private final EventLoopGroup eventLoopGroup; + private final MemoryLimitController memoryLimitController; private final LoadingCache schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @@ -165,6 +166,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr } } + memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes()); state.set(State.Open); } @@ -798,6 +800,10 @@ private LoadingCache getSchemaProviderLoadingCache() return schemaProviderLoadingCache; } + public MemoryLimitController getMemoryLimitController() { + return memoryLimitController; + } + @SuppressWarnings("unchecked") protected CompletableFuture> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index b60e3b46378e6..35f15e8fa16c8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -92,6 +92,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { private Set tlsCiphers = Sets.newTreeSet(); private Set tlsProtocols = Sets.newTreeSet(); + private long memoryLimitBytes = 0; + /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/ private String proxyServiceUrl; private ProxyProtocol proxyProtocol; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java new file mode 100644 index 0000000000000..c300b3ceb5654 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MemoryLimitControllerTest { + + private ExecutorService executor; + + @BeforeClass + void setup() { + executor = Executors.newCachedThreadPool(); + } + + @AfterClass + void teardown() { + executor.shutdownNow(); + } + + @Test + public void testLimit() throws Exception { + MemoryLimitController mlc = new MemoryLimitController(100); + + for (int i = 0; i < 101; i++) { + mlc.reserveMemory(1); + } + + assertEquals(mlc.currentUsage(), 101); + assertFalse(mlc.tryReserveMemory(1)); + mlc.releaseMemory(1); + assertEquals(mlc.currentUsage(), 100); + + assertTrue(mlc.tryReserveMemory(1)); + assertEquals(mlc.currentUsage(), 101); + } + + @Test + public void testBlocking() throws Exception { + MemoryLimitController mlc = new MemoryLimitController(100); + + for (int i = 0; i < 101; i++) { + mlc.reserveMemory(1); + } + + CountDownLatch l1 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l1.countDown(); + } catch (InterruptedException e) { + } + }); + + CountDownLatch l2 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l2.countDown(); + } catch (InterruptedException e) { + } + }); + + CountDownLatch l3 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l3.countDown(); + } catch (InterruptedException e) { + } + }); + + // The threads are blocked since the quota is full + assertFalse(l1.await(100, TimeUnit.MILLISECONDS)); + assertFalse(l2.await(100, TimeUnit.MILLISECONDS)); + assertFalse(l3.await(100, TimeUnit.MILLISECONDS)); + + assertEquals(mlc.currentUsage(), 101); + mlc.releaseMemory(3); + + assertTrue(l1.await(1, TimeUnit.SECONDS)); + assertTrue(l2.await(1, TimeUnit.SECONDS)); + assertTrue(l3.await(1, TimeUnit.SECONDS)); + assertEquals(mlc.currentUsage(), 101); + } + + @Test + public void testStepRelease() throws Exception { + MemoryLimitController mlc = new MemoryLimitController(100); + + for (int i = 0; i < 101; i++) { + mlc.reserveMemory(1); + } + + CountDownLatch l1 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l1.countDown(); + } catch (InterruptedException e) { + } + }); + + CountDownLatch l2 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l2.countDown(); + } catch (InterruptedException e) { + } + }); + + CountDownLatch l3 = new CountDownLatch(1); + executor.submit(() -> { + try { + mlc.reserveMemory(1); + l3.countDown(); + } catch (InterruptedException e) { + } + }); + + // The threads are blocked since the quota is full + assertFalse(l1.await(100, TimeUnit.MILLISECONDS)); + assertFalse(l2.await(100, TimeUnit.MILLISECONDS)); + assertFalse(l3.await(100, TimeUnit.MILLISECONDS)); + + assertEquals(mlc.currentUsage(), 101); + + mlc.releaseMemory(1); + mlc.releaseMemory(1); + mlc.releaseMemory(1); + + assertTrue(l1.await(1, TimeUnit.SECONDS)); + assertTrue(l2.await(1, TimeUnit.SECONDS)); + assertTrue(l3.await(1, TimeUnit.SECONDS)); + assertEquals(mlc.currentUsage(), 101); + } +}