From ebbe45a06a0c58b694604b313a42b86e44d829e9 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Tue, 27 Jun 2023 20:31:48 +0900 Subject: [PATCH 01/16] Putback memory limit --- .../apache/pulsar/client/cli/PulsarClientTool.java | 6 +++++- .../pulsar/testclient/LoadSimulationClient.java | 5 ++++- .../pulsar/testclient/PerformanceConsumer.java | 10 ++-------- .../pulsar/testclient/PerformanceProducer.java | 11 ++--------- .../apache/pulsar/testclient/PerformanceReader.java | 8 ++------ .../pulsar/testclient/PerformanceTransaction.java | 13 ++----------- 6 files changed, 17 insertions(+), 36 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index c64d80f380b9f..587115c433ca9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -76,6 +76,9 @@ public static class RootParams { @Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates") String tlsTrustCertsFilePath; + + @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + long memoryLimit; } protected RootParams rootParams; @@ -161,11 +164,12 @@ protected void initRootParamsFromProperties(Properties properties) { System.exit(1); } } + this.rootParams.memoryLimit = Long.parseLong(properties.getProperty("memoryLimit", "0")); } private void updateConfig() throws UnsupportedAuthenticationException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES); + .memoryLimit(this.rootParams.memoryLimit, SizeUnit.BYTES); Authentication authentication = null; if (isNotBlank(this.rootParams.authPluginClassName)) { authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index 64330ae2eeea1..0d28eecd41ca6 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -180,6 +180,9 @@ private static class MainArguments { @Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true) public String serviceURL; + + @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + long memoryLimit = 0L; } // Configuration class for initializing or modifying TradeUnits. @@ -318,7 +321,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception { .serviceHttpUrl(arguments.serviceURL) .build(); client = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(4) .ioThreads(Runtime.getRuntime().availableProcessors()) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 59dabc9302622..f883d4d1fcfc6 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -41,14 +41,7 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.HistogramLogWriter; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.MessageListener; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -229,6 +222,7 @@ public static void main(String[] args) throws Exception { long testEndTime = startTime + (long) (arguments.testTime * 1e9); ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .enableTransaction(arguments.isEnableTransaction); PulsarClient pulsarClient = clientBuilder.build(); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 389a6af4aaa58..c7692d4a62337 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -58,15 +58,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.MessageRoutingMode; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerAccessMode; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; @@ -504,6 +496,7 @@ private static void runProducer(int producerId, ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .enableTransaction(arguments.isEnableTransaction); client = clientBuilder.build(); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index ed5cc37644a31..053654c1241a8 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -34,12 +34,7 @@ import java.util.concurrent.atomic.LongAdder; import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -140,6 +135,7 @@ public static void main(String[] args) throws Exception { }; ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .enableTls(arguments.useTls); PulsarClient pulsarClient = clientBuilder.build(); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index 469e6ab1f3fd6..f8ae13c1368c4 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -49,17 +49,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; @@ -223,6 +213,7 @@ public static void main(String[] args) } ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .enableTransaction(!arguments.isDisableTransaction); PulsarClient client = clientBuilder.build(); From e8b262d396e637d8d9117126f0e27cd37b6d4e40 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 00:38:38 +0900 Subject: [PATCH 02/16] Fix check style --- .../testclient/PerformanceConsumer.java | 9 +- .../testclient/PerformanceProducer.java | 149 ++++++++++-------- .../pulsar/testclient/PerformanceReader.java | 7 +- .../testclient/PerformanceTransaction.java | 13 +- 4 files changed, 106 insertions(+), 72 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index f883d4d1fcfc6..6e84c1f29cc19 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -41,7 +41,14 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.HistogramLogWriter; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index c7692d4a62337..35e05c1bc6041 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -18,18 +18,34 @@ */ package org.apache.pulsar.testclient; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.util.concurrent.RateLimiter; import io.netty.util.concurrent.DefaultThreadFactory; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramLogWriter; +import org.HdrHistogram.Recorder; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.FileOutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; @@ -52,18 +68,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; -import org.HdrHistogram.Histogram; -import org.HdrHistogram.HistogramLogWriter; -import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; /** * A client program to test pulsar producer performance. @@ -95,17 +106,17 @@ public class PerformanceProducer { @Parameters(commandDescription = "Test pulsar producer performance.") static class Arguments extends PerformanceTopicListArguments { - @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", + @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class) public int numTestThreads = 1; - @Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics") + @Parameter(names = {"-r", "--rate"}, description = "Publish rate msg/s across topics") public int msgRate = 100; - @Parameter(names = { "-s", "--size" }, description = "Message size (bytes)") + @Parameter(names = {"-s", "--size"}, description = "Message size (bytes)") public int msgSize = 1024; - @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", + @Parameter(names = {"-n", "--num-producers"}, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class) public int numProducers = 1; @@ -116,98 +127,98 @@ static class Arguments extends PerformanceTopicListArguments { + "compatibility with previous version of pulsar-perf") public int sendTimeout = 0; - @Parameter(names = { "-pn", "--producer-name" }, description = "Producer Name") + @Parameter(names = {"-pn", "--producer-name"}, description = "Producer Name") public String producerName = null; - @Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL") + @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL") public String adminURL; - @Parameter(names = { "-ch", - "--chunking" }, description = "Should split the message and publish in chunks if message size is " + @Parameter(names = {"-ch", + "--chunking"}, description = "Should split the message and publish in chunks if message size is " + "larger than allowed max size") private boolean chunkingAllowed = false; - @Parameter(names = { "-o", "--max-outstanding" }, description = "Max number of outstanding messages") + @Parameter(names = {"-o", "--max-outstanding"}, description = "Max number of outstanding messages") public int maxOutstanding = DEFAULT_MAX_PENDING_MESSAGES; - @Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding " + @Parameter(names = {"-p", "--max-outstanding-across-partitions"}, description = "Max number of outstanding " + "messages across partitions") public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; - @Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number " + @Parameter(names = {"-np", "--partitions"}, description = "Create partitioned topics with the given number " + "of partitions, set 0 to not try to create the topic") public Integer partitions = null; - @Parameter(names = { "-m", - "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep " + @Parameter(names = {"-m", + "--num-messages"}, description = "Number of messages to publish in total. If <= 0, it will keep " + "publishing") public long numMessages = 0; - @Parameter(names = { "-z", "--compression" }, description = "Compress messages payload") + @Parameter(names = {"-z", "--compression"}, description = "Compress messages payload") public CompressionType compression = CompressionType.NONE; - @Parameter(names = { "-f", "--payload-file" }, description = "Use payload from an UTF-8 encoded text file and " + @Parameter(names = {"-f", "--payload-file"}, description = "Use payload from an UTF-8 encoded text file and " + "a payload will be randomly selected when publishing messages") public String payloadFilename = null; - @Parameter(names = { "-e", "--payload-delimiter" }, description = "The delimiter used to split lines when " + @Parameter(names = {"-e", "--payload-delimiter"}, description = "The delimiter used to split lines when " + "using payload from a file") // here escaping \n since default value will be printed with the help text public String payloadDelimiter = "\\n"; - @Parameter(names = { "-b", - "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)") + @Parameter(names = {"-b", + "--batch-time-window"}, description = "Batch messages in 'x' ms window (Default: 1ms)") public double batchTimeMillis = 1.0; - @Parameter(names = { "-db", - "--disable-batching" }, description = "Disable batching if true") + @Parameter(names = {"-db", + "--disable-batching"}, description = "Disable batching if true") public boolean disableBatching; @Parameter(names = { - "-bm", "--batch-max-messages" + "-bm", "--batch-max-messages" }, description = "Maximum number of messages per batch") public int batchMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES; @Parameter(names = { - "-bb", "--batch-max-bytes" + "-bb", "--batch-max-bytes" }, description = "Maximum number of bytes per batch") public int batchMaxBytes = 4 * 1024 * 1024; - @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing") + @Parameter(names = {"-time", + "--test-duration"}, description = "Test duration in secs. If <= 0, it will keep publishing") public long testTime = 0; @Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)") public double warmupTimeSeconds = 1.0; - @Parameter(names = { "-k", "--encryption-key-name" }, description = "The public key name to encrypt payload") + @Parameter(names = {"-k", "--encryption-key-name"}, description = "The public key name to encrypt payload") public String encKeyName = null; - @Parameter(names = { "-v", - "--encryption-key-value-file" }, + @Parameter(names = {"-v", + "--encryption-key-value-file"}, description = "The file which contains the public key to encrypt payload") public String encKeyFile = null; - @Parameter(names = { "-d", - "--delay" }, description = "Mark messages with a given delay in seconds") + @Parameter(names = {"-d", + "--delay"}, description = "Mark messages with a given delay in seconds") public long delay = 0; - @Parameter(names = { "-set", - "--set-event-time" }, description = "Set the eventTime on messages") + @Parameter(names = {"-set", + "--set-event-time"}, description = "Set the eventTime on messages") public boolean setEventTime = false; - @Parameter(names = { "-ef", - "--exit-on-failure" }, description = "Exit from the process on publish failure (default: disable)") + @Parameter(names = {"-ef", + "--exit-on-failure"}, description = "Exit from the process on publish failure (default: disable)") public boolean exitOnFailure = false; @Parameter(names = {"-mk", "--message-key-generation-mode"}, description = "The generation mode of message key" + ", valid options are: [autoIncrement, random]") public String messageKeyGenerationMode = null; - @Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode") + @Parameter(names = {"-am", "--access-mode"}, description = "Producer access mode") public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared; - @Parameter(names = { "-fp", "--format-payload" }, + @Parameter(names = {"-fp", "--format-payload"}, description = "Format %i as a message index in the stream from producer and/or %t as the timestamp" + " nanoseconds.") public boolean formatPayload = false; @@ -231,7 +242,7 @@ static class Arguments extends PerformanceTopicListArguments { + "setting to true, -abort takes effect)") public boolean isAbortTransaction = false; - @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file") + @Parameter(names = {"--histogram-file"}, description = "HdrHistogram output file") public String histogramFile = null; @Override @@ -266,7 +277,7 @@ public static void main(String[] args) throws Exception { List payloadByteList = new ArrayList<>(); if (arguments.payloadFilename != null) { Path payloadFilePath = Paths.get(arguments.payloadFilename); - if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) { + if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) { throw new IllegalArgumentException("Payload file doesn't exist or it is empty."); } // here escaping the default payload delimiter to correct value @@ -296,7 +307,7 @@ public static void main(String[] args) throws Exception { printAggregatedStats(); })); - if (arguments.partitions != null) { + if (arguments.partitions != null) { final PulsarAdminBuilder adminBuilder = PerfClientUtils .createAdminBuilderFromArguments(arguments, arguments.adminURL); @@ -332,13 +343,13 @@ public static void main(String[] args) throws Exception { executor.submit(() -> { log.info("Started performance test thread {}", threadIdx); runProducer( - threadIdx, - arguments, - numMessagesPerThread, - msgRatePerThread, - payloadByteList, - payloadBytes, - doneLatch + threadIdx, + arguments, + numMessagesPerThread, + msgRatePerThread, + payloadByteList, + payloadBytes, + doneLatch ); }); } @@ -603,7 +614,7 @@ private static void runProducer(int producerId, if (arguments.numMessagesPerTransaction > 0) { try { numMsgPerTxnLimit.acquire(); - } catch (InterruptedException exception){ + } catch (InterruptedException exception) { log.error("Get exception: ", exception); } } @@ -695,7 +706,7 @@ private static void runProducer(int producerId, numMsgPerTxnLimit.release(arguments.numMessagesPerTransaction); totalNumTxnOpenTxnSuccess.increment(); break; - } catch (Exception e){ + } catch (Exception e) { totalNumTxnOpenTxnFail.increment(); log.error("Failed to new transaction with exception: ", e); } @@ -745,10 +756,10 @@ private static void printAggregatedThroughput(long start, Arguments arguments) { TOTALFORMAT.format(rateOpenTxn)); } log.info( - "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s ", - totalMessagesSent.sum(), - TOTALFORMAT.format(rate), - TOTALFORMAT.format(throughput)); + "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s ", + totalMessagesSent.sum(), + TOTALFORMAT.format(rate), + TOTALFORMAT.format(throughput)); } private static void printAggregatedStats() { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 053654c1241a8..14161df450eab 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -34,7 +34,12 @@ import java.util.concurrent.atomic.LongAdder; import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index f8ae13c1368c4..3b422452d6401 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -49,7 +49,18 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; From c82e923eab7249e73c51a7dc867dd777d483076d Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 01:13:41 +0900 Subject: [PATCH 03/16] Implement perfUtil and test --- .../pulsar/testclient/PerfClientUtils.java | 2 +- .../testclient/PerformanceBaseArguments.java | 62 ++++++++++++++++ .../testclient/PerformanceConsumer.java | 1 + .../pulsar/testclient/PerformanceReader.java | 1 + .../PerformanceBaseArgumentsTest.java | 73 +++++++++++++++++++ 5 files changed, 138 insertions(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index f9e5d5ee7e6e1..3b44023ef503e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -67,7 +67,7 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu throws PulsarClientException.UnsupportedAuthenticationException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(arguments.maxConnections) .ioThreads(arguments.ioThreads) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index 5ae79fb0bf9a4..4920481f3152f 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -23,9 +23,11 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; import java.util.Properties; +import java.util.Set; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.ProxyProtocol; @@ -36,6 +38,8 @@ */ public abstract class PerformanceBaseArguments { + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + @Parameter(names = { "-h", "--help" }, description = "Print help message", help = true) boolean help; @@ -103,6 +107,12 @@ public abstract class PerformanceBaseArguments { @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true) public String deprecatedAuthPluginClassName; + @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " + + "(eg: 32M, 64M)") + String memoryLimitArgument = null; + + protected long memoryLimit = 0L; + public abstract void fillArgumentsFromProperties(Properties prop); @SneakyThrows @@ -171,6 +181,12 @@ public void fillArgumentsFromProperties() { } + if (StringUtils.isNotEmpty(memoryLimitArgument)) { + long memoryLimitArg = validateSizeString(memoryLimitArgument); + positiveCheck("memory-limit", memoryLimitArg); + memoryLimit = memoryLimitArg; + } + fillArgumentsFromProperties(prop); } @@ -228,4 +244,50 @@ public void parseCLI(String cmdName, String[] args) { } } + /** + * @see org.apache.pulsar.admin.cli.CliCommand#validateSizeString(String) + */ + static long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } + + /** + * @see org.apache.pulsar.admin.cli.CmdNamespaces.SetOffloadPolicies#positiveCheck(String, long) + */ + static boolean positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 6e84c1f29cc19..9bd74be3aa859 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 14161df450eab..6174caad1f938 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java index 42c93be343074..17c42fdd185a5 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java @@ -21,14 +21,19 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import com.beust.jcommander.ParameterException; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.apache.pulsar.client.api.ProxyProtocol.SNI; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; @@ -158,4 +163,72 @@ public void fillArgumentsFromProperties(Properties prop) { tempConfigFile.delete(); } } + + @DataProvider(name = "memoryLimitCliArgumentProvider") + public Object[][] memoryLimitCliArgumentProvider() { + return new Object[][] { + { new String[]{"-ml","1"}, 1L}, + { new String[]{"-ml","1K"}, 1024L}, + { new String[]{"--memory-limit", "1G"}, 1_073_741_824L} + }; + } + + @Test(dataProvider = "memoryLimitCliArgumentProvider") + public void testMemoryLimitCliArgument(String[] cliArgs, long expectedMemoryLimit) { + for (String cmd : List.of( + "pulsar-perf read", + "pulsar-perf produce", + "pulsar-perf consume", + "pulsar-perf transaction" + )) { + // Arrange + AtomicBoolean called = new AtomicBoolean(); + final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() { + @Override + public void fillArgumentsFromProperties(Properties prop) { + called.set(true); + } + }; + baseArgument.confFile = "./src/test/resources/perf_client1.conf"; + + // Act + baseArgument.parseCLI(cmd, cliArgs); + + // Assert + assertEquals(baseArgument.memoryLimit, expectedMemoryLimit); + } + } + + @DataProvider(name = "invalidMemoryLimitCliArgumentProvider") + public Object[][] invalidMemoryLimitCliArgumentProvider() { + return new Object[][] { + { new String[]{"-ml","-1"}}, + { new String[]{"-ml","1C"}}, + { new String[]{"--memory-limit", "1Q"}} + }; + } + + @Test(dataProvider = "invalidMemoryLimitCliArgumentProvider") + public void testInvalidMemoryLimitCliArgument(String[] cliArgs) { + for (String cmd : List.of( + "pulsar-perf read", + "pulsar-perf produce", + "pulsar-perf consume", + "pulsar-perf transaction" + )) { + // Arrange + AtomicBoolean called = new AtomicBoolean(); + final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() { + @Override + public void fillArgumentsFromProperties(Properties prop) { + called.set(true); + } + }; + baseArgument.confFile = "./src/test/resources/perf_client1.conf"; + + // Act & Assert + assertThrows(ParameterException.class, + () -> baseArgument.parseCLI(cmd, cliArgs)); + } + } } From 40629543defd2523461f278135633b3c7262d017 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 09:01:32 +0900 Subject: [PATCH 04/16] Add JavaDoc to memory limit --- .../apache/pulsar/testclient/PerformanceBaseArguments.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index 4920481f3152f..8fe6b2b87b19f 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -111,6 +111,12 @@ public abstract class PerformanceBaseArguments { + "(eg: 32M, 64M)") String memoryLimitArgument = null; + /** + * Byte reppresentation of memory limit. + * Should be used as actual value for {@link #memoryLimitArgument}. + * + * @see #memoryLimitArgument + */ protected long memoryLimit = 0L; public abstract void fillArgumentsFromProperties(Properties prop); From 2c2275e8b3b59943addb04eb49dda92038e9e62b Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 19:28:28 +0900 Subject: [PATCH 05/16] Minimize PerformanceProducer change --- .../testclient/PerformanceProducer.java | 157 +++++++++--------- 1 file changed, 77 insertions(+), 80 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 35e05c1bc6041..f89b20d5f83e6 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -18,34 +18,18 @@ */ package org.apache.pulsar.testclient; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.util.concurrent.RateLimiter; import io.netty.util.concurrent.DefaultThreadFactory; -import org.HdrHistogram.Histogram; -import org.HdrHistogram.HistogramLogWriter; -import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.MessageRoutingMode; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerAccessMode; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SizeUnit; -import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.FileOutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; @@ -68,13 +52,26 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; - -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; -import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramLogWriter; +import org.HdrHistogram.Recorder; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A client program to test pulsar producer performance. @@ -106,17 +103,17 @@ public class PerformanceProducer { @Parameters(commandDescription = "Test pulsar producer performance.") static class Arguments extends PerformanceTopicListArguments { - @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads", + @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class) public int numTestThreads = 1; - @Parameter(names = {"-r", "--rate"}, description = "Publish rate msg/s across topics") + @Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics") public int msgRate = 100; - @Parameter(names = {"-s", "--size"}, description = "Message size (bytes)") + @Parameter(names = { "-s", "--size" }, description = "Message size (bytes)") public int msgSize = 1024; - @Parameter(names = {"-n", "--num-producers"}, description = "Number of producers (per topic)", + @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class) public int numProducers = 1; @@ -127,98 +124,98 @@ static class Arguments extends PerformanceTopicListArguments { + "compatibility with previous version of pulsar-perf") public int sendTimeout = 0; - @Parameter(names = {"-pn", "--producer-name"}, description = "Producer Name") + @Parameter(names = { "-pn", "--producer-name" }, description = "Producer Name") public String producerName = null; - @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL") + @Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL") public String adminURL; - @Parameter(names = {"-ch", - "--chunking"}, description = "Should split the message and publish in chunks if message size is " + @Parameter(names = { "-ch", + "--chunking" }, description = "Should split the message and publish in chunks if message size is " + "larger than allowed max size") private boolean chunkingAllowed = false; - @Parameter(names = {"-o", "--max-outstanding"}, description = "Max number of outstanding messages") + @Parameter(names = { "-o", "--max-outstanding" }, description = "Max number of outstanding messages") public int maxOutstanding = DEFAULT_MAX_PENDING_MESSAGES; - @Parameter(names = {"-p", "--max-outstanding-across-partitions"}, description = "Max number of outstanding " + @Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding " + "messages across partitions") public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; - @Parameter(names = {"-np", "--partitions"}, description = "Create partitioned topics with the given number " + @Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number " + "of partitions, set 0 to not try to create the topic") public Integer partitions = null; - @Parameter(names = {"-m", - "--num-messages"}, description = "Number of messages to publish in total. If <= 0, it will keep " + @Parameter(names = { "-m", + "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep " + "publishing") public long numMessages = 0; - @Parameter(names = {"-z", "--compression"}, description = "Compress messages payload") + @Parameter(names = { "-z", "--compression" }, description = "Compress messages payload") public CompressionType compression = CompressionType.NONE; - @Parameter(names = {"-f", "--payload-file"}, description = "Use payload from an UTF-8 encoded text file and " + @Parameter(names = { "-f", "--payload-file" }, description = "Use payload from an UTF-8 encoded text file and " + "a payload will be randomly selected when publishing messages") public String payloadFilename = null; - @Parameter(names = {"-e", "--payload-delimiter"}, description = "The delimiter used to split lines when " + @Parameter(names = { "-e", "--payload-delimiter" }, description = "The delimiter used to split lines when " + "using payload from a file") // here escaping \n since default value will be printed with the help text public String payloadDelimiter = "\\n"; - @Parameter(names = {"-b", - "--batch-time-window"}, description = "Batch messages in 'x' ms window (Default: 1ms)") + @Parameter(names = { "-b", + "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)") public double batchTimeMillis = 1.0; - @Parameter(names = {"-db", - "--disable-batching"}, description = "Disable batching if true") + @Parameter(names = { "-db", + "--disable-batching" }, description = "Disable batching if true") public boolean disableBatching; @Parameter(names = { - "-bm", "--batch-max-messages" + "-bm", "--batch-max-messages" }, description = "Maximum number of messages per batch") public int batchMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES; @Parameter(names = { - "-bb", "--batch-max-bytes" + "-bb", "--batch-max-bytes" }, description = "Maximum number of bytes per batch") public int batchMaxBytes = 4 * 1024 * 1024; - @Parameter(names = {"-time", - "--test-duration"}, description = "Test duration in secs. If <= 0, it will keep publishing") + @Parameter(names = { "-time", + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing") public long testTime = 0; @Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)") public double warmupTimeSeconds = 1.0; - @Parameter(names = {"-k", "--encryption-key-name"}, description = "The public key name to encrypt payload") + @Parameter(names = { "-k", "--encryption-key-name" }, description = "The public key name to encrypt payload") public String encKeyName = null; - @Parameter(names = {"-v", - "--encryption-key-value-file"}, + @Parameter(names = { "-v", + "--encryption-key-value-file" }, description = "The file which contains the public key to encrypt payload") public String encKeyFile = null; - @Parameter(names = {"-d", - "--delay"}, description = "Mark messages with a given delay in seconds") + @Parameter(names = { "-d", + "--delay" }, description = "Mark messages with a given delay in seconds") public long delay = 0; - @Parameter(names = {"-set", - "--set-event-time"}, description = "Set the eventTime on messages") + @Parameter(names = { "-set", + "--set-event-time" }, description = "Set the eventTime on messages") public boolean setEventTime = false; - @Parameter(names = {"-ef", - "--exit-on-failure"}, description = "Exit from the process on publish failure (default: disable)") + @Parameter(names = { "-ef", + "--exit-on-failure" }, description = "Exit from the process on publish failure (default: disable)") public boolean exitOnFailure = false; @Parameter(names = {"-mk", "--message-key-generation-mode"}, description = "The generation mode of message key" + ", valid options are: [autoIncrement, random]") public String messageKeyGenerationMode = null; - @Parameter(names = {"-am", "--access-mode"}, description = "Producer access mode") + @Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode") public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared; - @Parameter(names = {"-fp", "--format-payload"}, + @Parameter(names = { "-fp", "--format-payload" }, description = "Format %i as a message index in the stream from producer and/or %t as the timestamp" + " nanoseconds.") public boolean formatPayload = false; @@ -242,7 +239,7 @@ static class Arguments extends PerformanceTopicListArguments { + "setting to true, -abort takes effect)") public boolean isAbortTransaction = false; - @Parameter(names = {"--histogram-file"}, description = "HdrHistogram output file") + @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file") public String histogramFile = null; @Override @@ -277,7 +274,7 @@ public static void main(String[] args) throws Exception { List payloadByteList = new ArrayList<>(); if (arguments.payloadFilename != null) { Path payloadFilePath = Paths.get(arguments.payloadFilename); - if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) { + if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) { throw new IllegalArgumentException("Payload file doesn't exist or it is empty."); } // here escaping the default payload delimiter to correct value @@ -307,7 +304,7 @@ public static void main(String[] args) throws Exception { printAggregatedStats(); })); - if (arguments.partitions != null) { + if (arguments.partitions != null) { final PulsarAdminBuilder adminBuilder = PerfClientUtils .createAdminBuilderFromArguments(arguments, arguments.adminURL); @@ -343,13 +340,13 @@ public static void main(String[] args) throws Exception { executor.submit(() -> { log.info("Started performance test thread {}", threadIdx); runProducer( - threadIdx, - arguments, - numMessagesPerThread, - msgRatePerThread, - payloadByteList, - payloadBytes, - doneLatch + threadIdx, + arguments, + numMessagesPerThread, + msgRatePerThread, + payloadByteList, + payloadBytes, + doneLatch ); }); } @@ -614,7 +611,7 @@ private static void runProducer(int producerId, if (arguments.numMessagesPerTransaction > 0) { try { numMsgPerTxnLimit.acquire(); - } catch (InterruptedException exception) { + } catch (InterruptedException exception){ log.error("Get exception: ", exception); } } @@ -706,7 +703,7 @@ private static void runProducer(int producerId, numMsgPerTxnLimit.release(arguments.numMessagesPerTransaction); totalNumTxnOpenTxnSuccess.increment(); break; - } catch (Exception e) { + } catch (Exception e){ totalNumTxnOpenTxnFail.increment(); log.error("Failed to new transaction with exception: ", e); } @@ -756,10 +753,10 @@ private static void printAggregatedThroughput(long start, Arguments arguments) { TOTALFORMAT.format(rateOpenTxn)); } log.info( - "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s ", - totalMessagesSent.sum(), - TOTALFORMAT.format(rate), - TOTALFORMAT.format(throughput)); + "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s ", + totalMessagesSent.sum(), + TOTALFORMAT.format(rate), + TOTALFORMAT.format(throughput)); } private static void printAggregatedStats() { From daae6613ca2f76b3750a98e2f1fa752040bb2668 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 22:01:01 +0900 Subject: [PATCH 06/16] Implement perf testclients with memory unit --- .../client/cli/PulsarClientToolTest.java | 30 ++++++++ .../client/cli/MemoryUnitToByteConverter.java | 69 ++++++++++++++++++ .../pulsar/client/cli/PulsarClientTool.java | 11 ++- .../testclient/LoadSimulationClient.java | 5 +- .../testclient/MemoryUnitToByteConverter.java | 72 +++++++++++++++++++ .../testclient/PerformanceBaseArguments.java | 70 +----------------- .../testclient/PerformanceProducer.java | 1 + .../PerformanceBaseArgumentsTest.java | 2 +- 8 files changed, 187 insertions(+), 73 deletions(-) create mode 100644 pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java create mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index c401f3d0bea64..a201b9bb4cca4 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -64,6 +64,7 @@ public void testInitialization() throws InterruptedException, ExecutionException Properties properties = new Properties(); properties.setProperty("serviceUrl", brokerUrl.toString()); properties.setProperty("useTls", "false"); + properties.setProperty("memoryLimit", "10M"); String tenantName = UUID.randomUUID().toString(); @@ -85,6 +86,7 @@ public void testInitialization() throws InterruptedException, ExecutionException String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n", Integer.toString(numberOfMessages), "--hex", "-r", "30", topicName }; Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + Assert.assertEquals(pulsarClientToolConsumer.rootParams.memoryLimit, 10 * 1024 * 1024); future.complete(null); } catch (Throwable t) { future.completeExceptionally(t); @@ -99,6 +101,7 @@ public void testInitialization() throws InterruptedException, ExecutionException String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName }; Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + Assert.assertEquals(pulsarClientToolProducer.rootParams.memoryLimit, 10 * 1024 * 1024); future.get(); } @@ -333,22 +336,49 @@ public void testArgs() throws Exception { final String message = "test msg"; final int numberOfMessages = 1; final String topicName = getTopicWithRandomSuffix("test-topic"); + final String memoryLimitArg = "10M"; String[] args = {"--url", url, "--auth-plugin", authPlugin, "--auth-params", authParams, "--tlsTrustCertsFilePath", CA_CERT_FILE_PATH, + "--memory-limit", memoryLimitArg, "produce", "-m", message, "-n", Integer.toString(numberOfMessages), topicName}; pulsarClientTool.jcommander.parse(args); assertEquals(pulsarClientTool.rootParams.getTlsTrustCertsFilePath(), CA_CERT_FILE_PATH); assertEquals(pulsarClientTool.rootParams.getAuthParams(), authParams); assertEquals(pulsarClientTool.rootParams.getAuthPluginClassName(), authPlugin); + assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024); assertEquals(pulsarClientTool.rootParams.getServiceURL(), url); assertNull(pulsarClientTool.rootParams.getProxyServiceURL()); assertNull(pulsarClientTool.rootParams.getProxyProtocol()); } + @Test(timeOut = 20000) + public void testMemoryLimitArgShortName() throws Exception { + PulsarClientTool pulsarClientTool = new PulsarClientTool(new Properties()); + final String url = "pulsar+ssl://localhost:6651"; + final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"; + final String authParams = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"), + getTlsFileForClient("admin.key-pk8")); + final String message = "test msg"; + final int numberOfMessages = 1; + final String topicName = getTopicWithRandomSuffix("test-topic"); + final String memoryLimitArg = "10M"; + + String[] args = {"--url", url, + "--auth-plugin", authPlugin, + "--auth-params", authParams, + "--tlsTrustCertsFilePath", CA_CERT_FILE_PATH, + "-ml", memoryLimitArg, + "produce", "-m", message, + "-n", Integer.toString(numberOfMessages), topicName}; + + pulsarClientTool.jcommander.parse(args); + assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024); + } + @Test public void testParsingProxyServiceUrlAndProxyProtocolFromProperties() throws Exception { Properties properties = new Properties(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java new file mode 100644 index 0000000000000..1b36d53c64088 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java @@ -0,0 +1,69 @@ +package org.apache.pulsar.client.cli; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +class MemoryUnitToByteConverter implements IStringConverter { + + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + private static final long DEFAULT_MEMORY_LIMIT = 0L; + + @Override + public Long convert(String memoryLimitArgument) { + return Math.max(DEFAULT_MEMORY_LIMIT, parseBytes(memoryLimitArgument)); + } + + public static long parseBytes(String memoryLimitArgument) { + if (StringUtils.isNotEmpty(memoryLimitArgument)) { + long memoryLimitArg = validateSizeString(memoryLimitArgument); + if (positiveCheckStatic("memory-limit", memoryLimitArg)) { + return memoryLimitArg; + } + } + return DEFAULT_MEMORY_LIMIT; + } + + static long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } + + static boolean positiveCheckStatic(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 587115c433ca9..d8e30490ab8ed 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -77,7 +77,8 @@ public static class RootParams { @Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates") String tlsTrustCertsFilePath; - @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) long memoryLimit; } @@ -154,6 +155,10 @@ protected void initRootParamsFromProperties(Properties properties) { this.rootParams.authParams = properties.getProperty("authParams"); this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl")); + this.rootParams.listenerName = StringUtils.trimToNull(properties.getProperty("listenerName")); + // setting memory limit + this.rootParams.memoryLimit = MemoryUnitToByteConverter.parseBytes(properties.getProperty("memoryLimit")); + String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol")); if (proxyProtocolString != null) { try { @@ -164,12 +169,11 @@ protected void initRootParamsFromProperties(Properties properties) { System.exit(1); } } - this.rootParams.memoryLimit = Long.parseLong(properties.getProperty("memoryLimit", "0")); } private void updateConfig() throws UnsupportedAuthenticationException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(this.rootParams.memoryLimit, SizeUnit.BYTES); + .memoryLimit(rootParams.memoryLimit, SizeUnit.BYTES); Authentication authentication = null; if (isNotBlank(this.rootParams.authPluginClassName)) { authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams); @@ -201,6 +205,7 @@ private void updateConfig() throws UnsupportedAuthenticationException { } clientBuilder.proxyServiceUrl(rootParams.proxyServiceURL, rootParams.proxyProtocol); } + this.produceCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); this.consumeCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); this.readCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index 0d28eecd41ca6..e1a0169daa1ea 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -181,8 +181,9 @@ private static class MainArguments { @Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true) public String serviceURL; - @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") - long memoryLimit = 0L; + @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + public long memoryLimit = 0L; } // Configuration class for initializing or modifying TradeUnits. diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java new file mode 100644 index 0000000000000..56787851200d2 --- /dev/null +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java @@ -0,0 +1,72 @@ +package org.apache.pulsar.testclient; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +/** + * @see org.apache.pulsar.client.cli.MemoryUnitToByteConverter + */ +class MemoryUnitToByteConverter implements IStringConverter { + + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + private static final long DEFAULT_MEMORY_LIMIT = 0L; + + @Override + public Long convert(String memoryLimitArgument) { + return Math.max(DEFAULT_MEMORY_LIMIT, parseBytes(memoryLimitArgument)); + } + + public static long parseBytes(String memoryLimitArgument) { + if (StringUtils.isNotEmpty(memoryLimitArgument)) { + long memoryLimitArg = validateSizeString(memoryLimitArgument); + if (positiveCheckStatic("memory-limit", memoryLimitArg)) { + return memoryLimitArg; + } + } + return DEFAULT_MEMORY_LIMIT; + } + + static long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } + + static boolean positiveCheckStatic(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } +} diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index 8fe6b2b87b19f..e8472fe1ee073 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -38,8 +38,6 @@ */ public abstract class PerformanceBaseArguments { - private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - @Parameter(names = { "-h", "--help" }, description = "Print help message", help = true) boolean help; @@ -108,16 +106,8 @@ public abstract class PerformanceBaseArguments { public String deprecatedAuthPluginClassName; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)") - String memoryLimitArgument = null; - - /** - * Byte reppresentation of memory limit. - * Should be used as actual value for {@link #memoryLimitArgument}. - * - * @see #memoryLimitArgument - */ - protected long memoryLimit = 0L; + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + public long memoryLimit = 0L; public abstract void fillArgumentsFromProperties(Properties prop); @@ -184,15 +174,8 @@ public void fillArgumentsFromProperties() { e.printStackTrace(); exit(1); } - - } - - if (StringUtils.isNotEmpty(memoryLimitArgument)) { - long memoryLimitArg = validateSizeString(memoryLimitArgument); - positiveCheck("memory-limit", memoryLimitArg); - memoryLimit = memoryLimitArg; } - + fillArgumentsFromProperties(prop); } @@ -249,51 +232,4 @@ public void parseCLI(String cmdName, String[] args) { PerfClientUtils.exit(1); } } - - /** - * @see org.apache.pulsar.admin.cli.CliCommand#validateSizeString(String) - */ - static long validateSizeString(String s) { - char last = s.charAt(s.length() - 1); - String subStr = s.substring(0, s.length() - 1); - long size; - try { - size = sizeUnit.contains(last) - ? Long.parseLong(subStr) - : Long.parseLong(s); - } catch (IllegalArgumentException e) { - throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", - s, "(4096, 100K, 10M, 16G, 2T)")); - } - switch (last) { - case 'k': - case 'K': - return size * 1024; - - case 'm': - case 'M': - return size * 1024 * 1024; - - case 'g': - case 'G': - return size * 1024 * 1024 * 1024; - - case 't': - case 'T': - return size * 1024 * 1024 * 1024 * 1024; - - default: - return size; - } - } - - /** - * @see org.apache.pulsar.admin.cli.CmdNamespaces.SetOffloadPolicies#positiveCheck(String, long) - */ - static boolean positiveCheck(String paramName, long value) { - if (value <= 0) { - throw new ParameterException(paramName + " is not be negative or 0!"); - } - return true; - } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index f89b20d5f83e6..2b3872d555d16 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java index 17c42fdd185a5..bb12ae54ea29c 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java @@ -169,7 +169,7 @@ public Object[][] memoryLimitCliArgumentProvider() { return new Object[][] { { new String[]{"-ml","1"}, 1L}, { new String[]{"-ml","1K"}, 1024L}, - { new String[]{"--memory-limit", "1G"}, 1_073_741_824L} + { new String[]{"--memory-limit", "1G"}, 1024 * 1024 * 1024} }; } From 979eaec141dce27726fce8bbbbef982148299c1a Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 22:04:54 +0900 Subject: [PATCH 07/16] Fix check style --- .../apache/pulsar/client/cli/MemoryUnitToByteConverter.java | 2 +- .../apache/pulsar/testclient/PerformanceBaseArguments.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java index 1b36d53c64088..9cc11479469ee 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java @@ -59,7 +59,7 @@ static long validateSizeString(String s) { return size; } } - + static boolean positiveCheckStatic(String paramName, long value) { if (value <= 0) { throw new ParameterException(paramName + " is not be negative or 0!"); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index e8472fe1ee073..c77e646315ef7 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -23,11 +23,9 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; -import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; import java.util.Properties; -import java.util.Set; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.ProxyProtocol; @@ -175,7 +173,7 @@ public void fillArgumentsFromProperties() { exit(1); } } - + fillArgumentsFromProperties(prop); } From aa7fe74576ca4884f50fa95a2dc59456957e31b2 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 23:01:39 +0900 Subject: [PATCH 08/16] Minimize changes --- .../java/org/apache/pulsar/client/cli/PulsarClientTool.java | 1 - .../org/apache/pulsar/testclient/PerformanceBaseArguments.java | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index d8e30490ab8ed..b1e2db26c619f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -205,7 +205,6 @@ private void updateConfig() throws UnsupportedAuthenticationException { } clientBuilder.proxyServiceUrl(rootParams.proxyServiceURL, rootParams.proxyProtocol); } - this.produceCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); this.consumeCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); this.readCommand.updateConfig(clientBuilder, authentication, this.rootParams.serviceURL); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index c77e646315ef7..e28bb3ce1a221 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -172,6 +172,7 @@ public void fillArgumentsFromProperties() { e.printStackTrace(); exit(1); } + } fillArgumentsFromProperties(prop); @@ -230,4 +231,5 @@ public void parseCLI(String cmdName, String[] args) { PerfClientUtils.exit(1); } } + } From f3891afe43a5e46e4d1b9f6bf70f2ee364b1d85d Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 28 Jun 2023 23:11:20 +0900 Subject: [PATCH 09/16] Add license header --- .../client/cli/MemoryUnitToByteConverter.java | 18 ++++++++++++++++++ .../testclient/MemoryUnitToByteConverter.java | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java index 9cc11479469ee..422f15767ef14 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.client.cli; import com.beust.jcommander.IStringConverter; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java index 56787851200d2..48c2307dd0d26 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.testclient; import com.beust.jcommander.IStringConverter; From 5c1eec35eb0c9a4135527d816b9c33ab047de27a Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Fri, 30 Jun 2023 15:00:24 +0900 Subject: [PATCH 10/16] Refactor to have default value inline --- .../client/cli/MemoryUnitToByteConverter.java | 15 ++++++++---- .../pulsar/client/cli/PulsarClientTool.java | 5 ++-- .../testclient/MemoryUnitToByteConverter.java | 18 ++++++++------- .../testclient/PerformanceBaseArguments.java | 2 +- .../PerformanceBaseArgumentsTest.java | 23 ++++++++----------- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java index 422f15767ef14..ef456644f4cc5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java @@ -27,24 +27,29 @@ class MemoryUnitToByteConverter implements IStringConverter { private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - private static final long DEFAULT_MEMORY_LIMIT = 0L; + + private final long defaultValue; + + public MemoryUnitToByteConverter(long defaultValue) { + this.defaultValue = defaultValue; + } @Override public Long convert(String memoryLimitArgument) { - return Math.max(DEFAULT_MEMORY_LIMIT, parseBytes(memoryLimitArgument)); + return Math.max(defaultValue, parseBytes(memoryLimitArgument)); } - public static long parseBytes(String memoryLimitArgument) { + long parseBytes(String memoryLimitArgument) { if (StringUtils.isNotEmpty(memoryLimitArgument)) { long memoryLimitArg = validateSizeString(memoryLimitArgument); if (positiveCheckStatic("memory-limit", memoryLimitArg)) { return memoryLimitArg; } } - return DEFAULT_MEMORY_LIMIT; + return defaultValue; } - static long validateSizeString(String s) { + long validateSizeString(String s) { char last = s.charAt(s.length() - 1); String subStr = s.substring(0, s.length() - 1); long size; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index b1e2db26c619f..4c1ea8d5598c6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -79,7 +79,7 @@ public static class RootParams { @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) - long memoryLimit; + long memoryLimit = 0L; } protected RootParams rootParams; @@ -157,7 +157,8 @@ protected void initRootParamsFromProperties(Properties properties) { this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl")); this.rootParams.listenerName = StringUtils.trimToNull(properties.getProperty("listenerName")); // setting memory limit - this.rootParams.memoryLimit = MemoryUnitToByteConverter.parseBytes(properties.getProperty("memoryLimit")); + this.rootParams.memoryLimit = new MemoryUnitToByteConverter(this.rootParams.memoryLimit) + .parseBytes(properties.getProperty("memoryLimit")); String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol")); if (proxyProtocolString != null) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java index 48c2307dd0d26..1f68ba67eff49 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java @@ -24,30 +24,32 @@ import java.util.Set; import org.apache.commons.lang3.StringUtils; -/** - * @see org.apache.pulsar.client.cli.MemoryUnitToByteConverter - */ class MemoryUnitToByteConverter implements IStringConverter { private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - private static final long DEFAULT_MEMORY_LIMIT = 0L; + + private final long defaultValue; + + public MemoryUnitToByteConverter(long defaultValue) { + this.defaultValue = defaultValue; + } @Override public Long convert(String memoryLimitArgument) { - return Math.max(DEFAULT_MEMORY_LIMIT, parseBytes(memoryLimitArgument)); + return Math.max(defaultValue, parseBytes(memoryLimitArgument)); } - public static long parseBytes(String memoryLimitArgument) { + long parseBytes(String memoryLimitArgument) { if (StringUtils.isNotEmpty(memoryLimitArgument)) { long memoryLimitArg = validateSizeString(memoryLimitArgument); if (positiveCheckStatic("memory-limit", memoryLimitArg)) { return memoryLimitArg; } } - return DEFAULT_MEMORY_LIMIT; + return defaultValue; } - static long validateSizeString(String s) { + long validateSizeString(String s) { char last = s.charAt(s.length() - 1); String subStr = s.substring(0, s.length() - 1); long size; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index e28bb3ce1a221..e1ed9e51c587d 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -105,7 +105,7 @@ public abstract class PerformanceBaseArguments { @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) - public long memoryLimit = 0L; + public long memoryLimit; public abstract void fillArgumentsFromProperties(Properties prop); diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java index bb12ae54ea29c..699f138bfdaa8 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.testclient; +import static org.apache.pulsar.client.api.ProxyProtocol.SNI; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -25,18 +28,10 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; - -import com.beust.jcommander.ParameterException; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static org.apache.pulsar.client.api.ProxyProtocol.SNI; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.fail; - - public class PerformanceBaseArgumentsTest { @Test @@ -208,8 +203,8 @@ public Object[][] invalidMemoryLimitCliArgumentProvider() { }; } - @Test(dataProvider = "invalidMemoryLimitCliArgumentProvider") - public void testInvalidMemoryLimitCliArgument(String[] cliArgs) { + @Test + public void testMemoryLimitCliArgumentDefault() { for (String cmd : List.of( "pulsar-perf read", "pulsar-perf produce", @@ -226,9 +221,11 @@ public void fillArgumentsFromProperties(Properties prop) { }; baseArgument.confFile = "./src/test/resources/perf_client1.conf"; - // Act & Assert - assertThrows(ParameterException.class, - () -> baseArgument.parseCLI(cmd, cliArgs)); + // Act + baseArgument.parseCLI(cmd, new String[]{}); + + // Assert + assertEquals(baseArgument.memoryLimit, 0L); } } } From b24f4e094732816adec6952e223f310f2baec032 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 23 Aug 2023 21:43:31 +0900 Subject: [PATCH 11/16] Import CLI utils --- pulsar-client-tools/pom.xml | 5 +++++ pulsar-testclient/pom.xml | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index 16a30f8d63040..def91c83394fa 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -67,6 +67,11 @@ pulsar-client-messagecrypto-bc ${project.version} + + ${project.groupId} + pulsar-cli-utils + ${project.version} + org.asynchttpclient async-http-client diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml index ce03dd444e101..9b720096009af 100644 --- a/pulsar-testclient/pom.xml +++ b/pulsar-testclient/pom.xml @@ -85,6 +85,12 @@ ${project.version} + + ${project.groupId} + pulsar-cli-utils + ${project.version} + + commons-configuration commons-configuration From d95a36f9ad523e77cb80e6bfee9351e2b2d15611 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 23 Aug 2023 21:43:45 +0900 Subject: [PATCH 12/16] Replace temporary converter --- .../client/cli/MemoryUnitToByteConverter.java | 92 ------------------- .../pulsar/client/cli/PulsarClientTool.java | 10 +- .../testclient/LoadSimulationClient.java | 3 +- .../testclient/MemoryUnitToByteConverter.java | 92 ------------------- .../testclient/PerformanceBaseArguments.java | 3 +- 5 files changed, 9 insertions(+), 191 deletions(-) delete mode 100644 pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java delete mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java deleted file mode 100644 index ef456644f4cc5..0000000000000 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.cli; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.ParameterException; -import com.google.common.collect.Sets; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; - -class MemoryUnitToByteConverter implements IStringConverter { - - private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - - private final long defaultValue; - - public MemoryUnitToByteConverter(long defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public Long convert(String memoryLimitArgument) { - return Math.max(defaultValue, parseBytes(memoryLimitArgument)); - } - - long parseBytes(String memoryLimitArgument) { - if (StringUtils.isNotEmpty(memoryLimitArgument)) { - long memoryLimitArg = validateSizeString(memoryLimitArgument); - if (positiveCheckStatic("memory-limit", memoryLimitArg)) { - return memoryLimitArg; - } - } - return defaultValue; - } - - long validateSizeString(String s) { - char last = s.charAt(s.length() - 1); - String subStr = s.substring(0, s.length() - 1); - long size; - try { - size = sizeUnit.contains(last) - ? Long.parseLong(subStr) - : Long.parseLong(s); - } catch (IllegalArgumentException e) { - throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", - s, "(4096, 100K, 10M, 16G, 2T)")); - } - switch (last) { - case 'k': - case 'K': - return size * 1024; - - case 'm': - case 'M': - return size * 1024 * 1024; - - case 'g': - case 'G': - return size * 1024 * 1024 * 1024; - - case 't': - case 'T': - return size * 1024 * 1024 * 1024 * 1024; - - default: - return size; - } - } - - static boolean positiveCheckStatic(String paramName, long value) { - if (value <= 0) { - throw new ParameterException(paramName + " is not be negative or 0!"); - } - return true; - } -} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 4c1ea8d5598c6..4057bbe9fdfd8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -32,6 +32,7 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; @@ -40,7 +41,6 @@ import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.SizeUnit; - public class PulsarClientTool { @Getter @@ -78,7 +78,7 @@ public static class RootParams { String tlsTrustCertsFilePath; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) long memoryLimit = 0L; } @@ -155,10 +155,10 @@ protected void initRootParamsFromProperties(Properties properties) { this.rootParams.authParams = properties.getProperty("authParams"); this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl")); - this.rootParams.listenerName = StringUtils.trimToNull(properties.getProperty("listenerName")); // setting memory limit - this.rootParams.memoryLimit = new MemoryUnitToByteConverter(this.rootParams.memoryLimit) - .parseBytes(properties.getProperty("memoryLimit")); + this.rootParams.memoryLimit = StringUtils.isNotEmpty(properties.getProperty("memoryLimit")) + ? new ByteUnitToLongConverter("memoryLimit").convert(properties.getProperty("memoryLimit")) + : this.rootParams.memoryLimit; String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol")); if (proxyProtocolString != null) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index e1a0169daa1ea..982c71ce6a5f4 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -182,7 +183,7 @@ private static class MainArguments { public String serviceURL; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) public long memoryLimit = 0L; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java deleted file mode 100644 index 1f68ba67eff49..0000000000000 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.testclient; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.ParameterException; -import com.google.common.collect.Sets; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; - -class MemoryUnitToByteConverter implements IStringConverter { - - private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - - private final long defaultValue; - - public MemoryUnitToByteConverter(long defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public Long convert(String memoryLimitArgument) { - return Math.max(defaultValue, parseBytes(memoryLimitArgument)); - } - - long parseBytes(String memoryLimitArgument) { - if (StringUtils.isNotEmpty(memoryLimitArgument)) { - long memoryLimitArg = validateSizeString(memoryLimitArgument); - if (positiveCheckStatic("memory-limit", memoryLimitArg)) { - return memoryLimitArg; - } - } - return defaultValue; - } - - long validateSizeString(String s) { - char last = s.charAt(s.length() - 1); - String subStr = s.substring(0, s.length() - 1); - long size; - try { - size = sizeUnit.contains(last) - ? Long.parseLong(subStr) - : Long.parseLong(s); - } catch (IllegalArgumentException e) { - throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", - s, "(4096, 100K, 10M, 16G, 2T)")); - } - switch (last) { - case 'k': - case 'K': - return size * 1024; - - case 'm': - case 'M': - return size * 1024 * 1024; - - case 'g': - case 'G': - return size * 1024 * 1024 * 1024; - - case 't': - case 'T': - return size * 1024 * 1024 * 1024 * 1024; - - default: - return size; - } - } - - static boolean positiveCheckStatic(String paramName, long value) { - if (value <= 0) { - throw new ParameterException(paramName + " is not be negative or 0!"); - } - return true; - } -} diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index e1ed9e51c587d..bc4ab003c4670 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -28,6 +28,7 @@ import java.util.Properties; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.ProxyProtocol; /** @@ -104,7 +105,7 @@ public abstract class PerformanceBaseArguments { public String deprecatedAuthPluginClassName; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) public long memoryLimit; public abstract void fillArgumentsFromProperties(Properties prop); From 9f18bc7398998f79411044ff7f648f1650c320b9 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 23 Aug 2023 21:56:04 +0900 Subject: [PATCH 13/16] Minimize changes by using existing module --- pulsar-client-tools/pom.xml | 5 - .../client/cli/MemoryUnitToByteConverter.java | 92 +++++++++++++++++++ .../pulsar/client/cli/PulsarClientTool.java | 10 +- pulsar-testclient/pom.xml | 6 -- .../testclient/LoadSimulationClient.java | 3 +- .../testclient/MemoryUnitToByteConverter.java | 92 +++++++++++++++++++ .../testclient/PerformanceBaseArguments.java | 3 +- 7 files changed, 191 insertions(+), 20 deletions(-) create mode 100644 pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java create mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index def91c83394fa..16a30f8d63040 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -67,11 +67,6 @@ pulsar-client-messagecrypto-bc ${project.version} - - ${project.groupId} - pulsar-cli-utils - ${project.version} - org.asynchttpclient async-http-client diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java new file mode 100644 index 0000000000000..ef456644f4cc5 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.cli; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +class MemoryUnitToByteConverter implements IStringConverter { + + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + + private final long defaultValue; + + public MemoryUnitToByteConverter(long defaultValue) { + this.defaultValue = defaultValue; + } + + @Override + public Long convert(String memoryLimitArgument) { + return Math.max(defaultValue, parseBytes(memoryLimitArgument)); + } + + long parseBytes(String memoryLimitArgument) { + if (StringUtils.isNotEmpty(memoryLimitArgument)) { + long memoryLimitArg = validateSizeString(memoryLimitArgument); + if (positiveCheckStatic("memory-limit", memoryLimitArg)) { + return memoryLimitArg; + } + } + return defaultValue; + } + + long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } + + static boolean positiveCheckStatic(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 4057bbe9fdfd8..4c1ea8d5598c6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -32,7 +32,6 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; @@ -41,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.SizeUnit; + public class PulsarClientTool { @Getter @@ -78,7 +78,7 @@ public static class RootParams { String tlsTrustCertsFilePath; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) long memoryLimit = 0L; } @@ -155,10 +155,10 @@ protected void initRootParamsFromProperties(Properties properties) { this.rootParams.authParams = properties.getProperty("authParams"); this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl")); + this.rootParams.listenerName = StringUtils.trimToNull(properties.getProperty("listenerName")); // setting memory limit - this.rootParams.memoryLimit = StringUtils.isNotEmpty(properties.getProperty("memoryLimit")) - ? new ByteUnitToLongConverter("memoryLimit").convert(properties.getProperty("memoryLimit")) - : this.rootParams.memoryLimit; + this.rootParams.memoryLimit = new MemoryUnitToByteConverter(this.rootParams.memoryLimit) + .parseBytes(properties.getProperty("memoryLimit")); String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol")); if (proxyProtocolString != null) { diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml index 9b720096009af..ce03dd444e101 100644 --- a/pulsar-testclient/pom.xml +++ b/pulsar-testclient/pom.xml @@ -85,12 +85,6 @@ ${project.version} - - ${project.groupId} - pulsar-cli-utils - ${project.version} - - commons-configuration commons-configuration diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index 982c71ce6a5f4..e1a0169daa1ea 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -183,7 +182,7 @@ private static class MainArguments { public String serviceURL; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) public long memoryLimit = 0L; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java new file mode 100644 index 0000000000000..1f68ba67eff49 --- /dev/null +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.testclient; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +class MemoryUnitToByteConverter implements IStringConverter { + + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + + private final long defaultValue; + + public MemoryUnitToByteConverter(long defaultValue) { + this.defaultValue = defaultValue; + } + + @Override + public Long convert(String memoryLimitArgument) { + return Math.max(defaultValue, parseBytes(memoryLimitArgument)); + } + + long parseBytes(String memoryLimitArgument) { + if (StringUtils.isNotEmpty(memoryLimitArgument)) { + long memoryLimitArg = validateSizeString(memoryLimitArgument); + if (positiveCheckStatic("memory-limit", memoryLimitArg)) { + return memoryLimitArg; + } + } + return defaultValue; + } + + long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } + + static boolean positiveCheckStatic(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } +} diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index bc4ab003c4670..e1ed9e51c587d 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -28,7 +28,6 @@ import java.util.Properties; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.ProxyProtocol; /** @@ -105,7 +104,7 @@ public abstract class PerformanceBaseArguments { public String deprecatedAuthPluginClassName; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) + + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) public long memoryLimit; public abstract void fillArgumentsFromProperties(Properties prop); From 9525de1a128ea6a9c46d901c97d2c755aa894784 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Wed, 23 Aug 2023 21:59:27 +0900 Subject: [PATCH 14/16] . --- pulsar-client-tools/pom.xml | 5 + .../client/cli/MemoryUnitToByteConverter.java | 92 ------------------- .../pulsar/client/cli/PulsarClientTool.java | 10 +- pulsar-testclient/pom.xml | 6 ++ .../testclient/LoadSimulationClient.java | 3 +- .../testclient/MemoryUnitToByteConverter.java | 92 ------------------- .../testclient/PerformanceBaseArguments.java | 3 +- 7 files changed, 20 insertions(+), 191 deletions(-) delete mode 100644 pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java delete mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index 16a30f8d63040..def91c83394fa 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -67,6 +67,11 @@ pulsar-client-messagecrypto-bc ${project.version} + + ${project.groupId} + pulsar-cli-utils + ${project.version} + org.asynchttpclient async-http-client diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java deleted file mode 100644 index ef456644f4cc5..0000000000000 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/MemoryUnitToByteConverter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.cli; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.ParameterException; -import com.google.common.collect.Sets; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; - -class MemoryUnitToByteConverter implements IStringConverter { - - private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - - private final long defaultValue; - - public MemoryUnitToByteConverter(long defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public Long convert(String memoryLimitArgument) { - return Math.max(defaultValue, parseBytes(memoryLimitArgument)); - } - - long parseBytes(String memoryLimitArgument) { - if (StringUtils.isNotEmpty(memoryLimitArgument)) { - long memoryLimitArg = validateSizeString(memoryLimitArgument); - if (positiveCheckStatic("memory-limit", memoryLimitArg)) { - return memoryLimitArg; - } - } - return defaultValue; - } - - long validateSizeString(String s) { - char last = s.charAt(s.length() - 1); - String subStr = s.substring(0, s.length() - 1); - long size; - try { - size = sizeUnit.contains(last) - ? Long.parseLong(subStr) - : Long.parseLong(s); - } catch (IllegalArgumentException e) { - throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", - s, "(4096, 100K, 10M, 16G, 2T)")); - } - switch (last) { - case 'k': - case 'K': - return size * 1024; - - case 'm': - case 'M': - return size * 1024 * 1024; - - case 'g': - case 'G': - return size * 1024 * 1024 * 1024; - - case 't': - case 'T': - return size * 1024 * 1024 * 1024 * 1024; - - default: - return size; - } - } - - static boolean positiveCheckStatic(String paramName, long value) { - if (value <= 0) { - throw new ParameterException(paramName + " is not be negative or 0!"); - } - return true; - } -} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 4c1ea8d5598c6..4057bbe9fdfd8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -32,6 +32,7 @@ import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; @@ -40,7 +41,6 @@ import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.SizeUnit; - public class PulsarClientTool { @Getter @@ -78,7 +78,7 @@ public static class RootParams { String tlsTrustCertsFilePath; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) long memoryLimit = 0L; } @@ -155,10 +155,10 @@ protected void initRootParamsFromProperties(Properties properties) { this.rootParams.authParams = properties.getProperty("authParams"); this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl")); - this.rootParams.listenerName = StringUtils.trimToNull(properties.getProperty("listenerName")); // setting memory limit - this.rootParams.memoryLimit = new MemoryUnitToByteConverter(this.rootParams.memoryLimit) - .parseBytes(properties.getProperty("memoryLimit")); + this.rootParams.memoryLimit = StringUtils.isNotEmpty(properties.getProperty("memoryLimit")) + ? new ByteUnitToLongConverter("memoryLimit").convert(properties.getProperty("memoryLimit")) + : this.rootParams.memoryLimit; String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol")); if (proxyProtocolString != null) { diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml index ce03dd444e101..9b720096009af 100644 --- a/pulsar-testclient/pom.xml +++ b/pulsar-testclient/pom.xml @@ -85,6 +85,12 @@ ${project.version} + + ${project.groupId} + pulsar-cli-utils + ${project.version} + + commons-configuration commons-configuration diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index e1a0169daa1ea..982c71ce6a5f4 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -182,7 +183,7 @@ private static class MainArguments { public String serviceURL; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) public long memoryLimit = 0L; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java deleted file mode 100644 index 1f68ba67eff49..0000000000000 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/MemoryUnitToByteConverter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.testclient; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.ParameterException; -import com.google.common.collect.Sets; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; - -class MemoryUnitToByteConverter implements IStringConverter { - - private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); - - private final long defaultValue; - - public MemoryUnitToByteConverter(long defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public Long convert(String memoryLimitArgument) { - return Math.max(defaultValue, parseBytes(memoryLimitArgument)); - } - - long parseBytes(String memoryLimitArgument) { - if (StringUtils.isNotEmpty(memoryLimitArgument)) { - long memoryLimitArg = validateSizeString(memoryLimitArgument); - if (positiveCheckStatic("memory-limit", memoryLimitArg)) { - return memoryLimitArg; - } - } - return defaultValue; - } - - long validateSizeString(String s) { - char last = s.charAt(s.length() - 1); - String subStr = s.substring(0, s.length() - 1); - long size; - try { - size = sizeUnit.contains(last) - ? Long.parseLong(subStr) - : Long.parseLong(s); - } catch (IllegalArgumentException e) { - throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", - s, "(4096, 100K, 10M, 16G, 2T)")); - } - switch (last) { - case 'k': - case 'K': - return size * 1024; - - case 'm': - case 'M': - return size * 1024 * 1024; - - case 'g': - case 'G': - return size * 1024 * 1024 * 1024; - - case 't': - case 'T': - return size * 1024 * 1024 * 1024 * 1024; - - default: - return size; - } - } - - static boolean positiveCheckStatic(String paramName, long value) { - if (value <= 0) { - throw new ParameterException(paramName + " is not be negative or 0!"); - } - return true; - } -} diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index e1ed9e51c587d..bc4ab003c4670 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -28,6 +28,7 @@ import java.util.Properties; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.cli.converters.ByteUnitToLongConverter; import org.apache.pulsar.client.api.ProxyProtocol; /** @@ -104,7 +105,7 @@ public abstract class PerformanceBaseArguments { public String deprecatedAuthPluginClassName; @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit " - + "(eg: 32M, 64M)", converter = MemoryUnitToByteConverter.class) + + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class) public long memoryLimit; public abstract void fillArgumentsFromProperties(Properties prop); From 7c0e29e29288d0b164245e486befd91a8db74257 Mon Sep 17 00:00:00 2001 From: joohyukkim Date: Mon, 4 Sep 2023 20:35:03 +0900 Subject: [PATCH 15/16] Update PerfClientUtils.java --- .../main/java/org/apache/pulsar/testclient/PerfClientUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index 38fb6a70a081f..3b44023ef503e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.common.util.DirectMemoryUtils; import org.slf4j.Logger; From 2502f041c0296ae6d6aeac4998cdf6f04a18f462 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 10 Oct 2023 16:01:53 +0800 Subject: [PATCH 16/16] fix license check Signed-off-by: tison --- src/check-binary-license.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/check-binary-license.sh b/src/check-binary-license.sh index 3a6d266345f30..4b48da2061c3a 100755 --- a/src/check-binary-license.sh +++ b/src/check-binary-license.sh @@ -41,7 +41,7 @@ if [ -z $TARBALL ]; then exit 1 fi -JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort) +JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-cli-utils | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort) LICENSEPATH=$(tar -tf $TARBALL | awk '/^[^\/]*\/LICENSE/') LICENSE=$(tar -O -xf $TARBALL "$LICENSEPATH")