Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import java.util.concurrent.CountDownLatch;

import lombok.Cleanup;

import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class MemoryLimitTest extends ProducerConsumerBase {

@DataProvider(name = "batching")
public Object[][] provider() {
return new Object[][] {
// "Batching"
{ false },
{ true },
};
}

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(dataProvider = "batching")
public void testRejectMessages(boolean enableBatch)
throws Exception {
String topic = newTopicName();

@Cleanup
PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.memoryLimit(100, SizeUnit.KILO_BYTES)
.build();

@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.blockIfQueueFull(false)
.create();

final int n = 101;
CountDownLatch latch = new CountDownLatch(n);

for (int i = 0; i < n; i++) {
producer.sendAsync(new byte[1024]).thenRun(() -> {
latch.countDown();
});
}

assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);

try {
producer.send(new byte[1024]);
fail("should have failed");
} catch (MemoryBufferIsFullError e) {
// Expected
}

latch.await();

assertEquals(client.getMemoryLimitController().currentUsage(), 0);

// We should now be able to send again
producer.send(new byte[1024]);
}

@Test(dataProvider = "batching")
public void testRejectMessagesOnMultipleTopics(boolean enableBatch) throws Exception {
String t1 = newTopicName();
String t2 = newTopicName();

@Cleanup
PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.memoryLimit(100, SizeUnit.KILO_BYTES)
.build();

@Cleanup
Producer<byte[]> p1 = client.newProducer()
.topic(t1)
.blockIfQueueFull(false)
.create();

@Cleanup
Producer<byte[]> p2 = client.newProducer()
.topic(t2)
.blockIfQueueFull(false)
.create();

final int n = 101;
CountDownLatch latch = new CountDownLatch(n);

for (int i = 0; i < n / 2; i++) {
p1.sendAsync(new byte[1024]).thenRun(() -> {
latch.countDown();
});
p2.sendAsync(new byte[1024]).thenRun(() -> {
latch.countDown();
});
}

// Last message in order to reach the limit
p1.sendAsync(new byte[1024]).thenRun(() -> {
latch.countDown();
});

assertEquals(client.getMemoryLimitController().currentUsage(), n * 1024);

try {
p1.send(new byte[1024]);
fail("should have failed");
} catch (MemoryBufferIsFullError e) {
// Expected
}

try {
p2.send(new byte[1024]);
fail("should have failed");
} catch (MemoryBufferIsFullError e) {
// Expected
}

latch.await();

assertEquals(client.getMemoryLimitController().currentUsage(), 0);

// We should now be able to send again
p1.send(new byte[1024]);
p2.send(new byte[1024]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;

import java.lang.reflect.Method;
import java.util.Random;
import java.util.Set;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -61,4 +62,10 @@ protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T rece
Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
}

private static final Random random = new Random();

protected String newTopicName() {
return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
*/
ClientBuilder tlsProtocols(Set<String> tlsProtocols);

/**
* Configure a limit on the amount of direct memory that will be allocated by this client instance.
* <p>
* <b>Note: at this moment this is only limiting the memory for producers.</b>
* <p>
* Setting this to 0 will disable the limit.
*
* @param memoryLimit
* the limit
* @param unit
* the memory limit size unit
* @return the client builder instance
*/
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

/**
* Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
* statsInterval It should be set to at least 1 second.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,35 @@ public ProducerQueueIsFullError(String msg, long sequenceId) {
}
}

/**
* Memory buffer full error thrown by Pulsar client.
*/
public static class MemoryBufferIsFullError extends PulsarClientException {
/**
* Constructs an {@code MemoryBufferIsFullError} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public MemoryBufferIsFullError(String msg) {
super(msg);
}

/**
* Constructs an {@code MemoryBufferIsFullError} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param sequenceId
* The sequenceId of the message
*/
public MemoryBufferIsFullError(String msg, long sequenceId) {
super(msg, sequenceId);
}
}

/**
* Producer blocked quota exceeded error thrown by Pulsar client.
*/
Expand Down Expand Up @@ -990,6 +1019,8 @@ public static PulsarClientException unwrap(Throwable t) {
return new TopicDoesNotExistException(msg);
} else if (cause instanceof ProducerFencedException) {
return new ProducerFencedException(msg);
} else if (cause instanceof MemoryBufferIsFullError) {
return new MemoryBufferIsFullError(msg);
} else {
return new PulsarClientException(t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Size unit converter.
*/
public enum SizeUnit {
BYTES(1L), KILO_BYTES(1024L), MEGA_BYTES(1024L * 1024L), GIGA_BYTES(1024L * 1024L * 1024L);

private final long bytes;

private SizeUnit(long bytes) {
this.bytes = bytes;
}

public long toBytes(long value) {
return value * bytes;
}

public long toKiloBytes(long value) {
return toBytes(value) / KILO_BYTES.bytes;
}

public long toMegaBytes(long value) {
return toBytes(value) / MEGA_BYTES.bytes;
}

public long toGigaBytes(long value) {
return toBytes(value) / GIGA_BYTES.bytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;

Expand Down Expand Up @@ -290,6 +291,12 @@ public ClientConfigurationData getClientConfigurationData() {
return conf;
}

@Override
public ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit) {
conf.setMemoryLimitBytes(unit.toBytes(memoryLimit));
return this;
}

@Override
public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
Expand Down
Loading