diff --git a/pom.xml b/pom.xml
index a5e1985752ef4..e7d2eab442869 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2252,6 +2252,7 @@ flexible messaging model and an intuitive client API.
pulsar-common
pulsar-broker-common
pulsar-broker
+ pulsar-cli-utils
pulsar-client-api
pulsar-client
pulsar-client-shaded
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 57081863a145e..4483a199b79e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,7 +2077,8 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
- consumer.getSubscription().getName());
+ consumer.getSubscription().getName(),
+ consumer.readCompacted());
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
@@ -2095,16 +2096,33 @@ private void getLargestBatchIndexWhenPossible(
PositionImpl markDeletePosition,
int partitionIndex,
long requestId,
- String subscriptionName) {
-
+ String subscriptionName,
+ boolean readCompacted) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the current position.
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
- Optional compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
- if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
- && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
+ Optional compactionHorizon = readCompacted ?
+ persistentTopic.getCompactedTopic().getCompactionHorizon() : Optional.empty();
+ if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) {
+ // there is no entry in the original topic
+ if (compactionHorizon != null && compactionHorizon.isPresent()) {
+ // if readCompacted is true, we need to read the last entry from compacted topic
+ handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
+ markDeletePosition);
+ return;
+ } else {
+ // if readCompacted is false, we need to return MessageId.earliest
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
+ markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+ markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+ }
+ return;
+ }
+
+ if (compactionHorizon != null && compactionHorizon.isPresent()
+ && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
markDeletePosition);
return;
@@ -2133,7 +2151,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
- if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+ if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
+ && readCompacted) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e585..6c2d848bb7c2d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -32,6 +34,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public void testGetLastMessageIdAfterCompactionAllNullMsg(boolean enabledBatch)
producer.close();
admin.topics().delete(topicName, false);
}
+
+ @Test(dataProvider = "enabledBatch")
+ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Producer producer = createProducer(enabledBatch, topicName);
+ producer.newMessage().key("k0").value("v0").sendAsync();
+ producer.newMessage().key("k0").value("v1").sendAsync();
+ producer.flush();
+
+ triggerCompactionAndWait(topicName);
+ triggerLedgerSwitch(topicName);
+ clearAllTheLedgersOutdated(topicName);
+
+ var reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .startMessageId(MessageId.earliest)
+ .create();
+ while (reader.hasMessageAvailable()) {
+ Message message = reader.readNext(5, TimeUnit.SECONDS);
+ assertNotEquals(message, null);
+ }
+ }
}
diff --git a/pulsar-cli-utils/pom.xml b/pulsar-cli-utils/pom.xml
new file mode 100644
index 0000000000000..f16e17447727d
--- /dev/null
+++ b/pulsar-cli-utils/pom.xml
@@ -0,0 +1,149 @@
+
+
+
+ 4.0.0
+
+ org.apache.pulsar
+ pulsar
+ 3.1.0-SNAPSHOT
+ ..
+
+
+ pulsar-cli-utils
+ Pulsar CLI Utils
+ Isolated CLI utility module
+
+
+
+ com.beust
+ jcommander
+ compile
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${pulsar.client.compiler.release}
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+
+ true
+ 8
+
+
+
+ modernizer
+ verify
+
+ modernizer
+
+
+
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+
+
+ git-info
+
+ revision
+
+
+
+
+ false
+ true
+ git
+ false
+ false
+ false
+ properties
+
+ true
+
+
+
+
+
+ org.codehaus.mojo
+ templating-maven-plugin
+ 1.0.0
+
+
+ filtering-java-templates
+
+ filter-sources
+
+
+
+
+
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+ ${spotbugs-maven-plugin.version}
+
+
+ spotbugs
+ verify
+
+ check
+
+
+
+
+ ${basedir}/src/main/resources/findbugsExclude.xml
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle
+ verify
+
+ check
+
+
+
+
+
+
+
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/ValueValidationUtil.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/ValueValidationUtil.java
new file mode 100644
index 0000000000000..c2000e1c7bc5a
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/ValueValidationUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli;
+
+import com.beust.jcommander.ParameterException;
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+@UtilityClass
+public class ValueValidationUtil {
+
+ public static void maxValueCheck(String paramName, long value, long maxValue) {
+ if (value > maxValue) {
+ throw new ParameterException(paramName + " cannot be bigger than <" + maxValue + ">!");
+ }
+ }
+
+ public static void positiveCheck(String paramName, long value) {
+ if (value <= 0) {
+ throw new ParameterException(paramName + " cannot be less than or equal to <0>!");
+ }
+ }
+
+ public static void positiveCheck(String paramName, int value) {
+ if (value <= 0) {
+ throw new ParameterException(paramName + " cannot be less than or equal to <0>!");
+ }
+ }
+
+ public static void emptyCheck(String paramName, String value) {
+ if (StringUtils.isEmpty(value)) {
+ throw new ParameterException("The value of " + paramName + " can't be empty");
+ }
+ }
+
+ public static void minValueCheck(String name, Long value, long min) {
+ if (value < min) {
+ throw new ParameterException(name + " cannot be less than <" + min + ">!");
+ }
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitIntegerConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitIntegerConverter.java
new file mode 100644
index 0000000000000..b148d238b149d
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitIntegerConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
+import static org.apache.pulsar.cli.converters.ByteUnitUtil.validateSizeString;
+import com.beust.jcommander.converters.BaseConverter;
+
+public class ByteUnitIntegerConverter extends BaseConverter {
+
+ public ByteUnitIntegerConverter(String optionName) {
+ super(optionName);
+ }
+
+ @Override
+ public Integer convert(String argStr) {
+ return parseBytes(argStr).intValue();
+ }
+
+ Long parseBytes(String argStr) {
+ emptyCheck(getOptionName(), argStr);
+ long valueInBytes = validateSizeString(argStr);
+ return valueInBytes;
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
new file mode 100644
index 0000000000000..6170fb489d4de
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
+import com.beust.jcommander.converters.BaseConverter;
+
+public class ByteUnitToLongConverter extends BaseConverter {
+
+ public ByteUnitToLongConverter(String optionName) {
+ super(optionName);
+ }
+
+ @Override
+ public Long convert(String argStr) {
+ return parseBytes(argStr);
+ }
+
+ Long parseBytes(String argStr) {
+ emptyCheck(getOptionName(), argStr);
+ return ByteUnitUtil.validateSizeString(argStr);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitUtil.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitUtil.java
new file mode 100644
index 0000000000000..cc6140dfced46
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import com.beust.jcommander.ParameterException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+class ByteUnitUtil {
+
+ private static Set sizeUnit = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList('k', 'K', 'm', 'M', 'g', 'G', 't', 'T')));
+
+ static long validateSizeString(String byteStr) {
+ if (byteStr.isEmpty()) {
+ throw new IllegalArgumentException("byte string cannot be empty");
+ }
+
+ char last = byteStr.charAt(byteStr.length() - 1);
+ String subStr = byteStr.substring(0, byteStr.length() - 1);
+ long size;
+ try {
+ size = sizeUnit.contains(last)
+ ? Long.parseLong(subStr)
+ : Long.parseLong(byteStr);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s",
+ byteStr, "(4096, 100K, 10M, 16G, 2T)"));
+ }
+ switch (last) {
+ case 'k':
+ case 'K':
+ return size * 1024;
+
+ case 'm':
+ case 'M':
+ return size * 1024 * 1024;
+
+ case 'g':
+ case 'G':
+ return size * 1024 * 1024 * 1024;
+
+ case 't':
+ case 'T':
+ return size * 1024 * 1024 * 1024 * 1024;
+
+ default:
+ return size;
+ }
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/RelativeTimeUtil.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/RelativeTimeUtil.java
new file mode 100644
index 0000000000000..412a6415e3c31
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/RelativeTimeUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.concurrent.TimeUnit;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class RelativeTimeUtil {
+ public static long parseRelativeTimeInSeconds(String relativeTime) {
+ if (relativeTime.isEmpty()) {
+ throw new IllegalArgumentException("time cannot be empty");
+ }
+
+ int lastIndex = relativeTime.length() - 1;
+ char lastChar = relativeTime.charAt(lastIndex);
+ final char timeUnit;
+
+ if (!Character.isAlphabetic(lastChar)) {
+ // No unit specified, assume seconds
+ timeUnit = 's';
+ lastIndex = relativeTime.length();
+ } else {
+ timeUnit = Character.toLowerCase(lastChar);
+ }
+
+ long duration = Long.parseLong(relativeTime.substring(0, lastIndex));
+
+ switch (timeUnit) {
+ case 's':
+ return duration;
+ case 'm':
+ return TimeUnit.MINUTES.toSeconds(duration);
+ case 'h':
+ return TimeUnit.HOURS.toSeconds(duration);
+ case 'd':
+ return TimeUnit.DAYS.toSeconds(duration);
+ case 'w':
+ return 7 * TimeUnit.DAYS.toSeconds(duration);
+ // No unit for months
+ case 'y':
+ return 365 * TimeUnit.DAYS.toSeconds(duration);
+ default:
+ throw new IllegalArgumentException("Invalid time unit '" + lastChar + "'");
+ }
+ }
+
+ /**
+ * Convert nanoseconds to seconds and keep three decimal places.
+ * @param ns
+ * @return seconds
+ */
+ public static double nsToSeconds(long ns) {
+ double seconds = (double) ns / 1_000_000_000;
+ BigDecimal bd = new BigDecimal(seconds);
+ return bd.setScale(3, RoundingMode.HALF_UP).doubleValue();
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToMillisConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToMillisConverter.java
new file mode 100644
index 0000000000000..38ff4f501a67a
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToMillisConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.converters.BaseConverter;
+import java.util.concurrent.TimeUnit;
+
+public class TimeUnitToMillisConverter extends BaseConverter {
+
+ public TimeUnitToMillisConverter(String optionName) {
+ super(optionName);
+ }
+
+ @Override
+ public Long convert(String str) {
+ emptyCheck(getOptionName(), str);
+ try {
+ return TimeUnit.SECONDS.toMillis(
+ RelativeTimeUtil.parseRelativeTimeInSeconds(str.trim()));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException("For input " + getOptionName() + ": " + exception.getMessage());
+ }
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java
new file mode 100644
index 0000000000000..3aca2e95d2526
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.converters.BaseConverter;
+import java.util.concurrent.TimeUnit;
+
+public class TimeUnitToSecondsConverter extends BaseConverter {
+
+ public TimeUnitToSecondsConverter(String optionName) {
+ super(optionName);
+ }
+
+ @Override
+ public Long convert(String str) {
+ emptyCheck(getOptionName(), str);
+ try {
+ return TimeUnit.SECONDS.toSeconds(
+ RelativeTimeUtil.parseRelativeTimeInSeconds(str.trim()));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException("For input " + getOptionName() + ": " + exception.getMessage());
+ }
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/package-info.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/package-info.java
new file mode 100644
index 0000000000000..4204abdef3b31
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/package-info.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/package-info.java
new file mode 100644
index 0000000000000..2b2198c265c64
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli;
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/IntegerMaxValueLongValidator.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/IntegerMaxValueLongValidator.java
new file mode 100644
index 0000000000000..63115b1418793
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/IntegerMaxValueLongValidator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.ParameterException;
+import org.apache.pulsar.cli.ValueValidationUtil;
+
+public class IntegerMaxValueLongValidator implements IValueValidator {
+ @Override
+ public void validate(String name, Long value) throws ParameterException {
+ ValueValidationUtil.maxValueCheck(name, value, Integer.MAX_VALUE);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/MinNegativeOneValidator.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/MinNegativeOneValidator.java
new file mode 100644
index 0000000000000..320e36812bfc2
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/MinNegativeOneValidator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.ParameterException;
+import org.apache.pulsar.cli.ValueValidationUtil;
+
+public class MinNegativeOneValidator implements IValueValidator {
+ @Override
+ public void validate(String name, Long value) throws ParameterException {
+ ValueValidationUtil.minValueCheck(name, value, -1L);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/NonNegativeValueValidator.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/NonNegativeValueValidator.java
new file mode 100644
index 0000000000000..473961be06d83
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/NonNegativeValueValidator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.ParameterException;
+import org.apache.pulsar.cli.ValueValidationUtil;
+
+public class NonNegativeValueValidator implements IValueValidator {
+ @Override
+ public void validate(String name, Long value) throws ParameterException {
+ ValueValidationUtil.minValueCheck(name, value, 0L);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveIntegerValueValidator.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveIntegerValueValidator.java
new file mode 100644
index 0000000000000..c6b4cc43d6825
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveIntegerValueValidator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.ParameterException;
+import org.apache.pulsar.cli.ValueValidationUtil;
+
+public class PositiveIntegerValueValidator implements IValueValidator {
+
+ @Override
+ public void validate(String name, Integer value) throws ParameterException {
+ ValueValidationUtil.positiveCheck(name, value);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveLongValueValidator.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveLongValueValidator.java
new file mode 100644
index 0000000000000..849a55241c665
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/PositiveLongValueValidator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.ParameterException;
+import org.apache.pulsar.cli.ValueValidationUtil;
+
+public class PositiveLongValueValidator implements IValueValidator {
+
+ @Override
+ public void validate(String name, Long value) throws ParameterException {
+ ValueValidationUtil.positiveCheck(name, value);
+ }
+}
diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/package-info.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/package-info.java
new file mode 100644
index 0000000000000..4d132b984c244
--- /dev/null
+++ b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/validators/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
diff --git a/pulsar-cli-utils/src/main/resources/findbugsExclude.xml b/pulsar-cli-utils/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000000000..ddde8120ba518
--- /dev/null
+++ b/pulsar-cli-utils/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,22 @@
+
+
+
\ No newline at end of file
diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/ValueValidationUtilTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/ValueValidationUtilTest.java
new file mode 100644
index 0000000000000..9d44ee41a2e25
--- /dev/null
+++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/ValueValidationUtilTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli;
+
+import static org.testng.Assert.assertThrows;
+import com.beust.jcommander.ParameterException;
+import org.testng.annotations.Test;
+
+public class ValueValidationUtilTest {
+
+ @Test
+ public void testMaxValueCheck() {
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.maxValueCheck("param1", 11L, 10L));
+ ValueValidationUtil.maxValueCheck("param2", 10L, 10L);
+ ValueValidationUtil.maxValueCheck("param3", 9L, 10L);
+ }
+
+ @Test
+ public void testPositiveCheck() {
+ // Long
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param1", 0L));
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param2", -1L));
+ ValueValidationUtil.positiveCheck("param3", 1L);
+
+ // Integer
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param4", 0));
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param5", -1));
+ ValueValidationUtil.positiveCheck("param6", 1);
+ }
+
+ @Test
+ public void testEmptyCheck() {
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.emptyCheck("param1", ""));
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.emptyCheck("param2", null));
+ ValueValidationUtil.emptyCheck("param3", "nonEmpty");
+ }
+
+ @Test
+ public void testMinValueCheck() {
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.minValueCheck("param1", 9L, 10L));
+ ValueValidationUtil.minValueCheck("param2", 10L, 10L);
+ ValueValidationUtil.minValueCheck("param3", 11L, 10L);
+ }
+
+ @Test
+ public void testPositiveCheckInt() {
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param1", 0));
+ assertThrows(ParameterException.class, () -> ValueValidationUtil.positiveCheck("param2", -1));
+ ValueValidationUtil.positiveCheck("param3", 1);
+ }
+}
diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
new file mode 100644
index 0000000000000..d669d455df1eb
--- /dev/null
+++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import com.beust.jcommander.ParameterException;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+public class ByteConversionTest {
+
+ @DataProvider
+ public static Object[][] successfulByteUnitUtilTestCases() {
+ return new Object[][] {
+ {"4096", 4096L},
+ {"1000", 1000L},
+ {"100K", 102400L},
+ {"100k", 102400L},
+ {"100M", 104857600L},
+ {"100m", 104857600L},
+ {"100G", 107374182400L},
+ {"100g", 107374182400L},
+ {"100T", 109951162777600L},
+ {"100t", 109951162777600L},
+ };
+ }
+
+ @DataProvider
+ public static Object[][] failingByteUnitUtilTestCases() {
+ return new Object[][] {
+ {""}, // Empty string
+ {"1Z"}, // Invalid size unit
+ {"1.5K"}, // Non-integer value
+ {"K"} // Missing size value
+ };
+ }
+
+ @Test(dataProvider = "successfulByteUnitUtilTestCases")
+ public void testSuccessfulByteUnitUtilConversion(String input, long expected) {
+ assertEquals(ByteUnitUtil.validateSizeString(input), expected);
+ }
+
+ @Test(dataProvider = "successfulByteUnitUtilTestCases")
+ public void testSuccessfulByteUnitToLongConverter(String input, long expected) {
+ ByteUnitToLongConverter converter = new ByteUnitToLongConverter("optionName");
+ assertEquals(converter.convert(input), Long.valueOf(expected));
+ }
+
+ @Test(dataProvider = "successfulByteUnitUtilTestCases")
+ public void testSuccessfulByteUnitIntegerConverter(String input, long expected) {
+ ByteUnitIntegerConverter converter = new ByteUnitIntegerConverter("optionName");
+ // Since the converter returns an Integer, we need to cast expected to int
+ assertEquals(converter.convert(input), Integer.valueOf((int) expected));
+ }
+
+ @Test(dataProvider = "failingByteUnitUtilTestCases")
+ public void testFailedByteUnitUtilConversion(String input) {
+ if (input.isEmpty()) {
+ assertThrows(IllegalArgumentException.class, () -> ByteUnitUtil.validateSizeString(input));
+ } else {
+ assertThrows(ParameterException.class, () -> ByteUnitUtil.validateSizeString(input));
+ }
+ }
+
+ @Test(dataProvider = "failingByteUnitUtilTestCases")
+ public void testFailedByteUnitToLongConverter(String input) {
+ ByteUnitToLongConverter converter = new ByteUnitToLongConverter("optionName");
+ assertThrows(ParameterException.class, () -> converter.convert(input));
+ }
+
+ @Test(dataProvider = "failingByteUnitUtilTestCases")
+ public void testFailedByteUnitIntegerConverter(String input) {
+ ByteUnitIntegerConverter converter = new ByteUnitIntegerConverter("optionName");
+ assertThrows(ParameterException.class, () -> converter.convert(input));
+ }
+}
+
diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/TimeConversionTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/TimeConversionTest.java
new file mode 100644
index 0000000000000..f7adeee0423ae
--- /dev/null
+++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/TimeConversionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.converters;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class TimeConversionTest {
+
+ @DataProvider
+ public static Object[][] successfulRelativeTimeUtilTestCases() {
+ return new Object[][] {
+ {"-1", -1L},
+ {"7", 7L},
+ {"100", 100L}, // No time unit, assuming seconds
+ {"3s", 3L},
+ {"3S", 3L},
+ {"10s", 10L},
+ {"1m", 60L},
+ {"5m", TimeUnit.MINUTES.toSeconds(5L)},
+ {"5M", TimeUnit.MINUTES.toSeconds(5L)},
+ {"7h", TimeUnit.HOURS.toSeconds(7L)},
+ {"7H", TimeUnit.HOURS.toSeconds(7L)},
+ {"9d", TimeUnit.DAYS.toSeconds(9L)},
+ {"9D", TimeUnit.DAYS.toSeconds(9L)},
+ {"1w", 604800L},
+ {"3W", 7 * TimeUnit.DAYS.toSeconds(3L)},
+ {"11y", 365 * TimeUnit.DAYS.toSeconds(11L)},
+ {"11Y", 365 * TimeUnit.DAYS.toSeconds(11L)},
+ {"-5m", -TimeUnit.MINUTES.toSeconds(5L)}
+ };
+ }
+
+ @Test(dataProvider = "successfulRelativeTimeUtilTestCases")
+ public void testSuccessfulRelativeTimeUtilParsing(String input, long expected) {
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds(input), expected);
+ }
+
+ @Test(dataProvider = "successfulRelativeTimeUtilTestCases")
+ public void testSuccessfulTimeUnitToSecondsConverter(String input, long expected) {
+ TimeUnitToSecondsConverter secondsConverter = new TimeUnitToSecondsConverter("optionName");
+ assertEquals(secondsConverter.convert(input), Long.valueOf(expected));
+ }
+
+ @Test(dataProvider = "successfulRelativeTimeUtilTestCases")
+ public void testSuccessfulTimeUnitToMillisConverter(String input, long expected) {
+ TimeUnitToMillisConverter millisConverter = new TimeUnitToMillisConverter("optionName");
+ // We multiply the expected by 1000 to convert the seconds into milliseconds
+ assertEquals(millisConverter.convert(input), Long.valueOf(expected * 1000));
+ }
+
+ @Test
+ public void testFailingParsing() {
+ assertThrows(IllegalArgumentException.class, () -> RelativeTimeUtil.parseRelativeTimeInSeconds("")); // Empty string
+ assertThrows(IllegalArgumentException.class, () -> RelativeTimeUtil.parseRelativeTimeInSeconds("s")); // Non-numeric character
+ assertThrows(IllegalArgumentException.class, () -> RelativeTimeUtil.parseRelativeTimeInSeconds("1z")); // Invalid time unit
+ assertThrows(IllegalArgumentException.class, () -> RelativeTimeUtil.parseRelativeTimeInSeconds("1.5")); // Floating point number
+ }
+
+ @Test
+ public void testNsToSeconds() {
+ assertEquals(RelativeTimeUtil.nsToSeconds(1_000_000_000), 1.000);
+ assertEquals(RelativeTimeUtil.nsToSeconds(1_500_000_000), 1.500);
+ assertEquals(RelativeTimeUtil.nsToSeconds(1_555_555_555), 1.556);
+ }
+}
diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/validators/CliUtilValidatorsTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/validators/CliUtilValidatorsTest.java
new file mode 100644
index 0000000000000..da1f6ec66bd9c
--- /dev/null
+++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/validators/CliUtilValidatorsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.cli.validators;
+
+import static org.testng.Assert.assertThrows;
+import com.beust.jcommander.ParameterException;
+import org.testng.annotations.Test;
+
+public class CliUtilValidatorsTest {
+
+ @Test
+ public void testPositiveLongValueValidator() {
+ PositiveLongValueValidator validator = new PositiveLongValueValidator();
+ assertThrows(ParameterException.class, () -> validator.validate("param", -1L));
+ assertThrows(ParameterException.class, () -> validator.validate("param", 0L));
+ validator.validate("param", 1L);
+ }
+
+ @Test
+ public void testPositiveIntegerValueValidator() {
+ PositiveIntegerValueValidator validator = new PositiveIntegerValueValidator();
+ assertThrows(ParameterException.class, () -> validator.validate("param", -1));
+ assertThrows(ParameterException.class, () -> validator.validate("param", 0));
+ validator.validate("param", 1);
+ }
+
+ @Test
+ public void testNonNegativeValueValidator() {
+ NonNegativeValueValidator validator = new NonNegativeValueValidator();
+ assertThrows(ParameterException.class, () -> validator.validate("param", -1L));
+ validator.validate("param", 0L);
+ validator.validate("param", 1L);
+ }
+
+ @Test
+ public void testMinNegativeOneValidator() {
+ MinNegativeOneValidator validator = new MinNegativeOneValidator();
+ assertThrows(ParameterException.class, () -> validator.validate("param", -2L));
+ validator.validate("param", -1L);
+ validator.validate("param", 0L);
+ }
+
+ @Test
+ public void testIntegerMaxValueLongValidator() {
+ IntegerMaxValueLongValidator validator = new IntegerMaxValueLongValidator();
+ assertThrows(ParameterException.class, () -> validator.validate("param", Integer.MAX_VALUE + 1L));
+ validator.validate("param", (long) Integer.MAX_VALUE);
+ }
+}