diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolConverterTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolConverterTest.java new file mode 100644 index 0000000000000..fc56c12029fe8 --- /dev/null +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolConverterTest.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.google.common.collect.Sets; +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; +import org.apache.pulsar.client.admin.Lookup; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.OffloadedReadPriority; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.TopicType; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarAdminToolConverterTest { + + + @Test + public void namespaces() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Namespaces mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + Lookup mockLookup = mock(Lookup.class); + when(admin.lookups()).thenReturn(mockLookup); + + CmdNamespaces namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("list myprop")); + verify(mockNamespaces).getNamespaces("myprop"); + + namespaces.run(split("list-cluster myprop/clust")); + verify(mockNamespaces).getNamespaces("myprop", "clust"); + + namespaces.run(split("topics myprop/clust/ns1")); + verify(mockNamespaces).getTopics("myprop/clust/ns1", ListNamespaceTopicsOptions.builder().build()); + + namespaces.run(split("policies myprop/clust/ns1")); + verify(mockNamespaces).getPolicies("myprop/clust/ns1"); + + namespaces.run(split("create myprop/clust/ns1")); + verify(mockNamespaces).createNamespace("myprop/clust/ns1"); + + namespaces.run(split("delete myprop/clust/ns1")); + verify(mockNamespaces).deleteNamespace("myprop/clust/ns1", false); + + namespaces.run(split("permissions myprop/clust/ns1")); + verify(mockNamespaces).getPermissions("myprop/clust/ns1"); + + namespaces.run(split("grant-permission myprop/clust/ns1 --role role1 --actions produce,consume")); + verify(mockNamespaces).grantPermissionOnNamespace("myprop/clust/ns1", "role1", + EnumSet.of(AuthAction.produce, AuthAction.consume)); + + namespaces.run(split("revoke-permission myprop/clust/ns1 --role role1")); + verify(mockNamespaces).revokePermissionsOnNamespace("myprop/clust/ns1", "role1"); + + namespaces.run(split("set-clusters myprop/clust/ns1 -c use,usw,usc")); + verify(mockNamespaces).setNamespaceReplicationClusters("myprop/clust/ns1", + Sets.newHashSet("use", "usw", "usc")); + + namespaces.run(split("get-clusters myprop/clust/ns1")); + verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1"); + + namespaces.run(split("set-subscription-types-enabled myprop/clust/ns1 -t Shared,Failover")); + verify(mockNamespaces).setSubscriptionTypesEnabled("myprop/clust/ns1", + Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover)); + + namespaces.run(split("get-subscription-types-enabled myprop/clust/ns1")); + verify(mockNamespaces).getSubscriptionTypesEnabled("myprop/clust/ns1"); + + namespaces.run(split("remove-subscription-types-enabled myprop/clust/ns1")); + verify(mockNamespaces).removeSubscriptionTypesEnabled("myprop/clust/ns1"); + + namespaces.run(split("get-schema-validation-enforce myprop/clust/ns1 -ap")); + verify(mockNamespaces).getSchemaValidationEnforced("myprop/clust/ns1", true); + + namespaces + .run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2")); + verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1", + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary("test1") + .bookkeeperAffinityGroupSecondary("test2") + .build()); + + namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1")); + verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1"); + + namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1")); + verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1"); + + namespaces.run(split("set-replicator-dispatch-rate myprop/clust/ns1 -md 10 -bd 11 -dt 12")); + verify(mockNamespaces).setReplicatorDispatchRate("myprop/clust/ns1", DispatchRate.builder() + .dispatchThrottlingRateInMsg(10) + .dispatchThrottlingRateInByte(11) + .ratePeriodInSecond(12) + .build()); + + namespaces.run(split("get-replicator-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).getReplicatorDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("remove-replicator-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).removeReplicatorDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("unload myprop/clust/ns1")); + verify(mockNamespaces).unload("myprop/clust/ns1"); + + // message_age must have time limit, destination_storage must have size limit + Assert.assertFalse(namespaces.run( + split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age"))); + Assert.assertFalse(namespaces.run( + split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage"))); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff")); + verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", null); + + namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); + + namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); + verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1"); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_request_hold -l 10")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitSize(10) + .retentionPolicy(RetentionPolicy.producer_request_hold) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10K")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitSize(10 * 1024) + .retentionPolicy(RetentionPolicy.producer_exception) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10M")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitSize(10 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_exception) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitSize(10L * 1024 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_exception) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitTime(10 * 60) + .retentionPolicy(RetentionPolicy.consumer_backlog_eviction) + .build(), + BacklogQuota.BacklogQuotaType.message_age); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age")); + verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", + BacklogQuota.builder() + .limitTime(10000) + .retentionPolicy(RetentionPolicy.producer_exception) + .build(), + BacklogQuota.BacklogQuotaType.message_age); + + namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0")); + verify(mockNamespaces).setPersistence("myprop/clust/ns1", + new PersistencePolicies(2, 1, 1, 100.0d)); + + namespaces.run(split("get-persistence myprop/clust/ns1")); + verify(mockNamespaces).getPersistence("myprop/clust/ns1"); + + namespaces.run(split("remove-persistence myprop/clust/ns1")); + verify(mockNamespaces).removePersistence("myprop/clust/ns1"); + + namespaces.run(split("get-max-subscriptions-per-topic myprop/clust/ns1")); + verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1"); + namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1 -m 300")); + verify(mockNamespaces).setMaxSubscriptionsPerTopic("myprop/clust/ns1", 300); + namespaces.run(split("remove-max-subscriptions-per-topic myprop/clust/ns1")); + verify(mockNamespaces).removeMaxSubscriptionsPerTopic("myprop/clust/ns1"); + + namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 300")); + verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 300); + + namespaces.run(split("set-subscription-expiration-time myprop/clust/ns1 -t 60")); + verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60); + + namespaces.run(split("get-deduplication myprop/clust/ns1")); + verify(mockNamespaces).getDeduplicationStatus("myprop/clust/ns1"); + namespaces.run(split("set-deduplication myprop/clust/ns1 --enable")); + verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true); + namespaces.run(split("remove-deduplication myprop/clust/ns1")); + verify(mockNamespaces).removeDeduplicationStatus("myprop/clust/ns1"); + + namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned")); + verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1", + AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.NON_PARTITIONED.toString()) + .build()); + + namespaces.run(split("get-auto-topic-creation myprop/clust/ns1")); + verify(mockNamespaces).getAutoTopicCreation("myprop/clust/ns1"); + + namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1")); + verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1"); + + namespaces.run(split("set-auto-subscription-creation myprop/clust/ns1 -e")); + verify(mockNamespaces).setAutoSubscriptionCreation("myprop/clust/ns1", + AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build()); + + namespaces.run(split("get-auto-subscription-creation myprop/clust/ns1")); + verify(mockNamespaces).getAutoSubscriptionCreation("myprop/clust/ns1"); + + namespaces.run(split("remove-auto-subscription-creation myprop/clust/ns1")); + verify(mockNamespaces).removeAutoSubscriptionCreation("myprop/clust/ns1"); + + namespaces.run(split("get-message-ttl myprop/clust/ns1")); + verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1"); + + namespaces.run(split("get-subscription-expiration-time myprop/clust/ns1")); + verify(mockNamespaces).getSubscriptionExpirationTime("myprop/clust/ns1"); + + namespaces.run(split("remove-subscription-expiration-time myprop/clust/ns1")); + verify(mockNamespaces).removeSubscriptionExpirationTime("myprop/clust/ns1"); + + namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group")); + verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group"); + + namespaces.run(split("get-anti-affinity-group myprop/clust/ns1")); + verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1"); + + namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group")); + verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group"); + + namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 ")); + verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1"); + + + namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M")); + verify(mockNamespaces).setRetention("myprop/clust/ns1", + new RetentionPolicies(60, 1)); + + // Test with default time unit (seconds) + namespaces = new CmdNamespaces(() -> admin); + reset(mockNamespaces); + namespaces.run(split("set-retention myprop/clust/ns1 -t 120 -s 20M")); + verify(mockNamespaces).setRetention("myprop/clust/ns1", + new RetentionPolicies(2, 20)); + + // Test with explicit time unit (seconds) + namespaces = new CmdNamespaces(() -> admin); + reset(mockNamespaces); + namespaces.run(split("set-retention myprop/clust/ns1 -t 120s -s 20M")); + verify(mockNamespaces).setRetention("myprop/clust/ns1", + new RetentionPolicies(2, 20)); + + // Test size with default size less than 1 mb + namespaces = new CmdNamespaces(() -> admin); + reset(mockNamespaces); + namespaces.run(split("set-retention myprop/clust/ns1 -t 120s -s 4096")); + verify(mockNamespaces).setRetention("myprop/clust/ns1", + new RetentionPolicies(2, 0)); + + // Test size with default size greater than 1mb + namespaces = new CmdNamespaces(() -> admin); + reset(mockNamespaces); + namespaces.run(split("set-retention myprop/clust/ns1 -t 180 -s " + (2 * 1024 * 1024))); + verify(mockNamespaces).setRetention("myprop/clust/ns1", + new RetentionPolicies(3, 2)); + + namespaces.run(split("get-retention myprop/clust/ns1")); + verify(mockNamespaces).getRetention("myprop/clust/ns1"); + + namespaces.run(split("remove-retention myprop/clust/ns1")); + verify(mockNamespaces).removeRetention("myprop/clust/ns1"); + + namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s")); + verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", + DelayedDeliveryPolicies.builder().tickTime(1000).active(true).build()); + + namespaces.run(split("get-delayed-delivery myprop/clust/ns1")); + verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1"); + + namespaces.run(split("remove-delayed-delivery myprop/clust/ns1")); + verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1"); + + namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions")); + verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1", + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, + true)); + + namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1")); + verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1"); + + namespaces.run(split("remove-inactive-topic-policies myprop/clust/ns1")); + verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1"); + + namespaces.run(split("clear-backlog myprop/clust/ns1 -force")); + verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-message-ttl myprop/clust/ns1 -ttl 6m")); + verify(mockNamespaces).setNamespaceMessageTTL("myprop/clust/ns1", 6 * 60); + + namespaces.run(split("clear-backlog -b 0x80000000_0xffffffff myprop/clust/ns1 -force")); + verify(mockNamespaces).clearNamespaceBundleBacklog("myprop/clust/ns1", "0x80000000_0xffffffff"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("clear-backlog -s my-sub myprop/clust/ns1 -force")); + verify(mockNamespaces).clearNamespaceBacklogForSubscription("myprop/clust/ns1", "my-sub"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("clear-backlog -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1 -force")); + verify(mockNamespaces).clearNamespaceBundleBacklogForSubscription("myprop/clust/ns1", "0x80000000_0xffffffff", + "my-sub"); + + namespaces.run(split("unsubscribe -s my-sub myprop/clust/ns1")); + verify(mockNamespaces).unsubscribeNamespace("myprop/clust/ns1", "my-sub"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("unsubscribe -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1")); + verify(mockNamespaces).unsubscribeNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", "my-sub"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("get-max-producers-per-topic myprop/clust/ns1")); + verify(mockNamespaces).getMaxProducersPerTopic("myprop/clust/ns1"); + + namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1")); + verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1); + + namespaces.run(split("remove-max-producers-per-topic myprop/clust/ns1")); + verify(mockNamespaces).removeMaxProducersPerTopic("myprop/clust/ns1"); + + namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1")); + verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1"); + + namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2")); + verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2); + + namespaces.run(split("remove-max-consumers-per-topic myprop/clust/ns1")); + verify(mockNamespaces).removeMaxConsumersPerTopic("myprop/clust/ns1"); + + namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1")); + verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1"); + + namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1")); + verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1"); + + namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3")); + verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3); + + namespaces.run(split("get-max-unacked-messages-per-subscription myprop/clust/ns1")); + verify(mockNamespaces).getMaxUnackedMessagesPerSubscription("myprop/clust/ns1"); + + namespaces.run(split("set-max-unacked-messages-per-subscription myprop/clust/ns1 -c 3")); + verify(mockNamespaces).setMaxUnackedMessagesPerSubscription("myprop/clust/ns1", 3); + + namespaces.run(split("remove-max-unacked-messages-per-subscription myprop/clust/ns1")); + verify(mockNamespaces).removeMaxUnackedMessagesPerSubscription("myprop/clust/ns1"); + + namespaces.run(split("get-max-unacked-messages-per-consumer myprop/clust/ns1")); + verify(mockNamespaces).getMaxUnackedMessagesPerConsumer("myprop/clust/ns1"); + + namespaces.run(split("set-max-unacked-messages-per-consumer myprop/clust/ns1 -c 3")); + verify(mockNamespaces).setMaxUnackedMessagesPerConsumer("myprop/clust/ns1", 3); + + namespaces.run(split("remove-max-unacked-messages-per-consumer myprop/clust/ns1")); + verify(mockNamespaces).removeMaxUnackedMessagesPerConsumer("myprop/clust/ns1"); + + mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + namespaces = new CmdNamespaces(() -> admin); + + namespaces.run(split("set-dispatch-rate myprop/clust/ns1 -md -1 -bd -1 -dt 2")); + verify(mockNamespaces).setDispatchRate("myprop/clust/ns1", DispatchRate.builder() + .dispatchThrottlingRateInMsg(-1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(2) + .build()); + + namespaces.run(split("get-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).getDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("remove-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).removeDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20")); + verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new PublishRate(10, 20)); + + namespaces.run(split("get-publish-rate myprop/clust/ns1")); + verify(mockNamespaces).getPublishRate("myprop/clust/ns1"); + + namespaces.run(split("remove-publish-rate myprop/clust/ns1")); + verify(mockNamespaces).removePublishRate("myprop/clust/ns1"); + + namespaces.run(split("set-subscribe-rate myprop/clust/ns1 -sr 2 -st 60")); + verify(mockNamespaces).setSubscribeRate("myprop/clust/ns1", new SubscribeRate(2, 60)); + + namespaces.run(split("get-subscribe-rate myprop/clust/ns1")); + verify(mockNamespaces).getSubscribeRate("myprop/clust/ns1"); + + namespaces.run(split("remove-subscribe-rate myprop/clust/ns1")); + verify(mockNamespaces).removeSubscribeRate("myprop/clust/ns1"); + + namespaces.run(split("set-subscription-dispatch-rate myprop/clust/ns1 -md -1 -bd -1 -dt 2")); + verify(mockNamespaces).setSubscriptionDispatchRate("myprop/clust/ns1", DispatchRate.builder() + .dispatchThrottlingRateInMsg(-1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(2) + .build()); + + namespaces.run(split("get-subscription-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).getSubscriptionDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("remove-subscription-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).removeSubscriptionDispatchRate("myprop/clust/ns1"); + + namespaces.run(split("get-compaction-threshold myprop/clust/ns1")); + verify(mockNamespaces).getCompactionThreshold("myprop/clust/ns1"); + + namespaces.run(split("remove-compaction-threshold myprop/clust/ns1")); + verify(mockNamespaces).removeCompactionThreshold("myprop/clust/ns1"); + + namespaces.run(split("set-compaction-threshold myprop/clust/ns1 -t 1G")); + verify(mockNamespaces).setCompactionThreshold("myprop/clust/ns1", 1024 * 1024 * 1024); + + namespaces.run(split("get-offload-threshold myprop/clust/ns1")); + verify(mockNamespaces).getOffloadThreshold("myprop/clust/ns1"); + + namespaces.run(split("set-offload-threshold myprop/clust/ns1 -s 1G")); + verify(mockNamespaces).setOffloadThreshold("myprop/clust/ns1", 1024 * 1024 * 1024); + + namespaces.run(split("get-offload-deletion-lag myprop/clust/ns1")); + verify(mockNamespaces).getOffloadDeleteLagMs("myprop/clust/ns1"); + + namespaces.run(split("set-offload-deletion-lag myprop/clust/ns1 -l 1d")); + verify(mockNamespaces).setOffloadDeleteLag("myprop/clust/ns1", 24 * 60 * 60, TimeUnit.SECONDS); + + namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1")); + verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); + + namespaces.run(split( + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first")); + verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", + OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", + "http://test.endpoint", null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, + 10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + + namespaces.run(split("remove-offload-policies myprop/clust/ns1")); + verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1"); + + namespaces.run(split("get-offload-policies myprop/clust/ns1")); + verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1"); + + namespaces.run(split("remove-message-ttl myprop/clust/ns1")); + verify(mockNamespaces).removeNamespaceMessageTTL("myprop/clust/ns1"); + + namespaces.run(split("set-deduplication-snapshot-interval myprop/clust/ns1 -i 1000")); + verify(mockNamespaces).setDeduplicationSnapshotInterval("myprop/clust/ns1", 1000); + namespaces.run(split("get-deduplication-snapshot-interval myprop/clust/ns1")); + verify(mockNamespaces).getDeduplicationSnapshotInterval("myprop/clust/ns1"); + namespaces.run(split("remove-deduplication-snapshot-interval myprop/clust/ns1")); + verify(mockNamespaces).removeDeduplicationSnapshotInterval("myprop/clust/ns1"); + + } + + String[] split(String s) { + return s.split(" "); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 5901d7c177ea7..d93433c786444 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -36,6 +36,14 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.admin.cli.utils.IOUtils; +import org.apache.pulsar.admin.cli.utils.converters.ByteUnitIntegerConverter; +import org.apache.pulsar.admin.cli.utils.converters.ByteUnitToLongConverter; +import org.apache.pulsar.admin.cli.utils.converters.TimeUnitToMillisConverter; +import org.apache.pulsar.admin.cli.utils.converters.TimeUnitToSecondsConverter; +import org.apache.pulsar.admin.cli.utils.validators.MinNegativeOneValidator; +import org.apache.pulsar.admin.cli.utils.validators.NonNegativeValueValidator; +import org.apache.pulsar.admin.cli.utils.validators.PositiveIntegerValueValidator; +import org.apache.pulsar.admin.cli.utils.validators.PositiveLongValueValidator; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.Mode; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -65,7 +73,6 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.common.util.RelativeTimeUtil; @Parameters(commandDescription = "Operations about namespaces") public class CmdNamespaces extends CmdBase { @@ -402,25 +409,15 @@ private class SetMessageTTL extends CliCommand { @Parameter(names = { "--messageTTL", "-ttl" }, description = "Message TTL in seconds (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w). " - + "When the value is set to `0`, TTL is disabled.", required = true) - private String messageTTLStr; + + "When the value is set to `0`, TTL is disabled.", required = true, + converter = TimeUnitToSecondsConverter.class, + validateValueWith = {NonNegativeValueValidator.class}) + private Long messageTTLInSecond; @Override void run() throws PulsarAdminException { - long messageTTLInSecond; - try { - messageTTLInSecond = RelativeTimeUtil.parseRelativeTimeInSeconds(messageTTLStr); - } catch (IllegalArgumentException e) { - throw new ParameterException(e.getMessage()); - } - - if (messageTTLInSecond < 0 || messageTTLInSecond > Integer.MAX_VALUE) { - throw new ParameterException( - String.format("Message TTL cannot be negative or greater than %d seconds", Integer.MAX_VALUE)); - } - String namespace = validateNamespace(params); - getAdmin().namespaces().setNamespaceMessageTTL(namespace, (int) messageTTLInSecond); + getAdmin().namespaces().setNamespaceMessageTTL(namespace, messageTTLInSecond.intValue()); } } @@ -745,39 +742,27 @@ private class SetRetention extends CliCommand { + "For example, 100m, 3h, 2d, 5w. " + "If the time unit is not specified, the default unit is seconds. For example, " + "-t 120 sets retention to 2 minutes. " - + "0 means no retention and -1 means infinite time retention.", required = true) - private String retentionTimeStr; + + "0 means no retention and -1 means infinite time retention.", required = true, + converter = TimeUnitToSecondsConverter.class, + validateValueWith = MinNegativeOneValidator.class) + private Long retentionTimeInSec; @Parameter(names = { "--size", "-s" }, description = "Retention size limit with optional size unit suffix. " + "For example, 4096, 10M, 16G, 3T. The size unit suffix character can be k/K, m/M, g/G, or t/T. " + "If the size unit suffix is not specified, the default unit is bytes. " - + "0 or less than 1MB means no retention and -1 means infinite size retention", required = true) - private String limitStr; + + "0 or less than 1MB means no retention and -1 means infinite size retention", required = true, + converter = ByteUnitIntegerConverter.class) + private Integer sizeLimit; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long sizeLimit = validateSizeString(limitStr); - long retentionTimeInSec; - try { - retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } - - final int retentionTimeInMin; - if (retentionTimeInSec != -1) { - retentionTimeInMin = (int) TimeUnit.SECONDS.toMinutes(retentionTimeInSec); - } else { - retentionTimeInMin = -1; - } - - final int retentionSizeInMB; - if (sizeLimit != -1) { - retentionSizeInMB = (int) (sizeLimit / (1024 * 1024)); - } else { - retentionSizeInMB = -1; - } + final int retentionTimeInMin = retentionTimeInSec != -1 + ? (int) TimeUnit.SECONDS.toMinutes(retentionTimeInSec) + : retentionTimeInSec.intValue(); + final int retentionSizeInMB = sizeLimit != -1 + ? (int) (sizeLimit / (1024 * 1024)) + : sizeLimit; getAdmin().namespaces() .setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB)); } @@ -1265,13 +1250,15 @@ private class SetBacklogQuota extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; - @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)") - private String limitStr; + @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", + converter = ByteUnitToLongConverter.class) + private Long limit = 0L; @Parameter(names = { "-lt", "--limitTime" }, description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), " - + "non-positive number for disabling time limit.") - private String limitTimeStr = null; + + "non-positive number for disabling time limit.", + converter = TimeUnitToSecondsConverter.class) + private Long limitTimeInSec; @Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. " + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", @@ -1309,23 +1296,16 @@ void run() throws PulsarAdminException { BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy); if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) { // set quota by storage size - if (limitStr == null) { + if (limit == null) { throw new ParameterException("Quota type of 'destination_storage' needs a size limit"); } - long limit = validateSizeString(limitStr); builder.limitSize(limit); } else { // set quota by time - if (limitTimeStr == null) { + if (limitTimeInSec == null) { throw new ParameterException("Quota type of 'message_age' needs a time limit"); } - long limitTimeInSec; - try { - limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr); - } catch (IllegalArgumentException e) { - throw new ParameterException(e.getMessage()); - } - builder.limitTime((int) limitTimeInSec); + builder.limitTime(limitTimeInSec.intValue()); } getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType); } @@ -1570,8 +1550,9 @@ private class SetInactiveTopicPolicies extends CliCommand { @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in " + "seconds, topics that are inactive for longer than this value will be deleted " - + "(eg: 1s, 10s, 1m, 5h, 3d)", required = true) - private String deleteInactiveTopicsMaxInactiveDuration; + + "(eg: 1s, 10s, 1m, 5h, 3d)", required = true, + converter = TimeUnitToSecondsConverter.class) + private Long maxInactiveDurationInSeconds; @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic, Valid options are: " + "[delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true) @@ -1580,14 +1561,6 @@ private class SetInactiveTopicPolicies extends CliCommand { @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long maxInactiveDurationInSeconds; - try { - maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds( - RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration)); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } - if (enableDeleteWhileInactive == disableDeleteWhileInactive) { throw new ParameterException("Need to specify either enable-delete-while-inactive or " + "disable-delete-while-inactive"); @@ -1600,7 +1573,7 @@ void run() throws PulsarAdminException { + "delete_when_subscriptions_caught_up"); } getAdmin().namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(deleteMode, - (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive)); + maxInactiveDurationInSeconds.intValue(), enableDeleteWhileInactive)); } } @@ -1617,20 +1590,13 @@ private class SetDelayedDelivery extends CliCommand { @Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on " + "delayed delivery messages, affecting the accuracy of the delivery time compared to " - + "the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)") - private String delayedDeliveryTimeStr = "1s"; + + "the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)", + converter = TimeUnitToMillisConverter.class) + private Long delayedDeliveryTimeInMills = 1000L; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long delayedDeliveryTimeInMills; - try { - delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis( - RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr)); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } - if (enable == disable) { throw new ParameterException("Need to specify either --enable or --disable"); } @@ -1942,13 +1908,13 @@ private class SetCompactionThreshold extends CliCommand { @Parameter(names = { "--threshold", "-t" }, description = "Maximum number of bytes in a topic backlog before compaction is triggered " + "(eg: 10M, 16G, 3T). 0 disables automatic compaction", - required = true) - private String thresholdStr = "0"; + required = true, + converter = ByteUnitToLongConverter.class) + private Long threshold = 0L; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long threshold = validateSizeString(thresholdStr); getAdmin().namespaces().setCompactionThreshold(namespace, threshold); } } @@ -1976,13 +1942,13 @@ private class SetOffloadThreshold extends CliCommand { + " -1 falls back to the cluster's namespace default." + " Negative values disable automatic offload." + " 0 triggers offloading as soon as possible.", - required = true) - private String thresholdStr = "-1"; + required = true, + converter = ByteUnitToLongConverter.class) + private Long threshold = -1L; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long threshold = validateSizeString(thresholdStr); getAdmin().namespaces().setOffloadThreshold(namespace, threshold); } } @@ -2012,18 +1978,13 @@ private class SetOffloadDeletionLag extends CliCommand { @Parameter(names = { "--lag", "-l" }, description = "Duration to wait after offloading a ledger segment, before deleting the copy of that" + " segment from cluster local storage. (eg: 10m, 5h, 3d, 2w).", - required = true) - private String lag = "-1"; + required = true, + converter = TimeUnitToSecondsConverter.class) + private Long lagInSec = -1L; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - long lagInSec; - try { - lagInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(lag); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } getAdmin().namespaces().setOffloadDeleteLag(namespace, lagInSec, TimeUnit.SECONDS); } @@ -2266,34 +2227,41 @@ private class SetOffloadPolicies extends CliCommand { names = {"--maxBlockSize", "-mbs"}, description = "Max block size (eg: 32M, 64M), default is 64MB" + "s3 and google-cloud-storage requires this parameter", - required = false) - private String maxBlockSizeStr; + required = false, + converter = ByteUnitIntegerConverter.class, + validateValueWith = {PositiveIntegerValueValidator.class}) + private Integer maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; @Parameter( names = {"--readBufferSize", "-rbs"}, description = "Read buffer size (eg: 1M, 5M), default is 1MB", - required = false) - private String readBufferSizeStr; + required = false, + converter = ByteUnitIntegerConverter.class, + validateValueWith = {PositiveIntegerValueValidator.class}) + private Integer readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; @Parameter( names = {"--offloadAfterElapsed", "-oae"}, description = "Delay time in Millis for deleting the bookkeeper ledger after offload " + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).", - required = false) - private String offloadAfterElapsedStr; + required = false, + converter = TimeUnitToMillisConverter.class, + validateValueWith = PositiveLongValueValidator.class) + private Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; @Parameter( names = {"--offloadAfterThreshold", "-oat"}, description = "Offload after threshold size (eg: 1M, 5M)", - required = false) - private String offloadAfterThresholdStr; + required = false, + converter = ByteUnitToLongConverter.class) + private Long offloadAfterThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; @Parameter( names = {"--offloadAfterThresholdInSeconds", "-oats"}, description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).", - required = false - ) - private String offloadAfterThresholdInSecondsStr; + required = false, + converter = TimeUnitToSecondsConverter.class) + private Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; @Parameter( names = {"--offloadedReadPriority", "-orp"}, @@ -2348,71 +2316,15 @@ void run() throws PulsarAdminException { + " if s3 offload enabled"); } - int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; - if (StringUtils.isNotEmpty(maxBlockSizeStr)) { - long maxBlockSize = validateSizeString(maxBlockSizeStr); - if (positiveCheck("MaxBlockSize", maxBlockSize) - && maxValueCheck("MaxBlockSize", maxBlockSize, Integer.MAX_VALUE)) { - maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue(); - } - } - - int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; - if (StringUtils.isNotEmpty(readBufferSizeStr)) { - long readBufferSize = validateSizeString(readBufferSizeStr); - if (positiveCheck("ReadBufferSize", readBufferSize) - && maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) { - readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue(); - } - } - - Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; - if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) { - Long offloadAfterElapsed; - try { - offloadAfterElapsed = TimeUnit.SECONDS.toMillis( - RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr)); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } - if (positiveCheck("OffloadAfterElapsed", offloadAfterElapsed) - && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { - offloadAfterElapsedInMillis = offloadAfterElapsed; - } - } - - Long offloadAfterThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; - if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) { - long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr); - if (maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) { - offloadAfterThresholdInBytes = offloadAfterThreshold; - } - } - - Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; - if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) { - Long offloadThresholdInSeconds0; - try { - offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds( - RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim())); - } catch (IllegalArgumentException exception) { - throw new ParameterException(exception.getMessage()); - } - if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) { - offloadThresholdInSeconds = offloadThresholdInSeconds0; - } - } - OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; - if (this.offloadReadPriorityStr != null) { try { offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr); } catch (Exception e) { throw new ParameterException("--offloadedReadPriority parameter must be one of " + Arrays.stream(OffloadedReadPriority.values()) - .map(OffloadedReadPriority::toString) - .collect(Collectors.joining(",")) + .map(OffloadedReadPriority::toString) + .collect(Collectors.joining(",")) + " but got: " + this.offloadReadPriorityStr, e); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/ValueValidationUtils.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/ValueValidationUtils.java new file mode 100644 index 0000000000000..9c5c6f9b99ce2 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/ValueValidationUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils; + +import com.beust.jcommander.ParameterException; +import org.apache.commons.lang3.StringUtils; + +public class ValueValidationUtils { + private ValueValidationUtils() { + } + + public static void maxValueCheck(String paramName, long value, long maxValue) { + if (value > maxValue) { + throw new ParameterException(paramName + " cannot be bigger than <" + maxValue + ">!"); + } + } + + public static void positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " cannot be less than or equal to <0>!"); + } + } + + public static void positiveCheck(String paramName, int value) { + if (value <= 0) { + throw new ParameterException(paramName + " cannot be less than or equal to <0>!"); + } + } + + public static void emptyCheck(String paramName, String value) { + if (StringUtils.isEmpty(value)) { + throw new ParameterException("The value of " + paramName + " can't be empty"); + } + } + + public static void minValueCheck(String name, Long value, long min) { + if (value < min) { + throw new ParameterException(name + " cannot be less than <" + min + ">!"); + } + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitIntegerConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitIntegerConverter.java new file mode 100644 index 0000000000000..84252c5f88b5f --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitIntegerConverter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; + +import static org.apache.pulsar.admin.cli.utils.ValueValidationUtils.emptyCheck; +import static org.apache.pulsar.admin.cli.utils.converters.ByteUnitUtil.validateSizeString; +import com.beust.jcommander.converters.BaseConverter; + +public class ByteUnitIntegerConverter extends BaseConverter { + + public ByteUnitIntegerConverter(String optionName) { + super(optionName); + } + + @Override + public Integer convert(String argStr) { + return parseBytes(argStr).intValue(); + } + + Long parseBytes(String argStr) { + emptyCheck(getOptionName(), argStr); + long valueInBytes = validateSizeString(argStr); + return valueInBytes; + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitToLongConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitToLongConverter.java new file mode 100644 index 0000000000000..e401d1503b2cf --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitToLongConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; + +import static org.apache.pulsar.admin.cli.utils.ValueValidationUtils.emptyCheck; +import com.beust.jcommander.converters.BaseConverter; + +public class ByteUnitToLongConverter extends BaseConverter { + + public ByteUnitToLongConverter(String optionName) { + super(optionName); + } + + @Override + public Long convert(String argStr) { + return parseBytes(argStr); + } + + Long parseBytes(String argStr) { + emptyCheck(getOptionName(), argStr); + return ByteUnitUtil.validateSizeString(argStr); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitUtil.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitUtil.java new file mode 100644 index 0000000000000..5febb8fa4b700 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/ByteUnitUtil.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; + +import com.beust.jcommander.ParameterException; +import com.google.common.collect.Sets; +import java.util.Set; + +class ByteUnitUtil { + + private static Set sizeUnit = Sets.newHashSet('k', 'K', 'm', 'M', 'g', 'G', 't', 'T'); + + static long validateSizeString(String s) { + char last = s.charAt(s.length() - 1); + String subStr = s.substring(0, s.length() - 1); + long size; + try { + size = sizeUnit.contains(last) + ? Long.parseLong(subStr) + : Long.parseLong(s); + } catch (IllegalArgumentException e) { + throw new ParameterException(String.format("Invalid size '%s'. Valid formats are: %s", + s, "(4096, 100K, 10M, 16G, 2T)")); + } + switch (last) { + case 'k': + case 'K': + return size * 1024; + + case 'm': + case 'M': + return size * 1024 * 1024; + + case 'g': + case 'G': + return size * 1024 * 1024 * 1024; + + case 't': + case 'T': + return size * 1024 * 1024 * 1024 * 1024; + + default: + return size; + } + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToMillisConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToMillisConverter.java new file mode 100644 index 0000000000000..43fe01b13d247 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToMillisConverter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; + +import static org.apache.pulsar.admin.cli.utils.ValueValidationUtils.emptyCheck; +import com.beust.jcommander.ParameterException; +import com.beust.jcommander.converters.BaseConverter; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.RelativeTimeUtil; + +public class TimeUnitToMillisConverter extends BaseConverter { + + public TimeUnitToMillisConverter(String optionName) { + super(optionName); + } + + @Override + public Long convert(String str) { + emptyCheck(getOptionName(), str); + try { + return TimeUnit.SECONDS.toMillis( + RelativeTimeUtil.parseRelativeTimeInSeconds(str.trim())); + } catch (IllegalArgumentException exception) { + throw new ParameterException("For input " + getOptionName() + ": " + exception.getMessage()); + } + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToSecondsConverter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToSecondsConverter.java new file mode 100644 index 0000000000000..cd0aba22b2b1e --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/TimeUnitToSecondsConverter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; + +import static org.apache.pulsar.admin.cli.utils.ValueValidationUtils.emptyCheck; +import com.beust.jcommander.ParameterException; +import com.beust.jcommander.converters.BaseConverter; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.RelativeTimeUtil; + +public class TimeUnitToSecondsConverter extends BaseConverter { + + public TimeUnitToSecondsConverter(String optionName) { + super(optionName); + } + + @Override + public Long convert(String str) { + emptyCheck(getOptionName(), str); + try { + return TimeUnit.SECONDS.toSeconds( + RelativeTimeUtil.parseRelativeTimeInSeconds(str.trim())); + } catch (IllegalArgumentException exception) { + throw new ParameterException("For input " + getOptionName() + ": " + exception.getMessage()); + } + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/package-info.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/package-info.java new file mode 100644 index 0000000000000..bac33be58dc21 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/converters/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.converters; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/IntegerMaxValueLongValidator.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/IntegerMaxValueLongValidator.java new file mode 100644 index 0000000000000..535a4c90859f0 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/IntegerMaxValueLongValidator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; +import org.apache.pulsar.admin.cli.utils.ValueValidationUtils; + +public class IntegerMaxValueLongValidator implements IValueValidator { + @Override + public void validate(String name, Long value) throws ParameterException { + ValueValidationUtils.maxValueCheck(name, value, Integer.MAX_VALUE); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/MinNegativeOneValidator.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/MinNegativeOneValidator.java new file mode 100644 index 0000000000000..9952da445991c --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/MinNegativeOneValidator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; +import org.apache.pulsar.admin.cli.utils.ValueValidationUtils; + +public class MinNegativeOneValidator implements IValueValidator { + @Override + public void validate(String name, Long value) throws ParameterException { + ValueValidationUtils.minValueCheck(name, value, -1L); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/NonNegativeValueValidator.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/NonNegativeValueValidator.java new file mode 100644 index 0000000000000..c6ad0a327ac28 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/NonNegativeValueValidator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; +import org.apache.pulsar.admin.cli.utils.ValueValidationUtils; + +public class NonNegativeValueValidator implements IValueValidator { + @Override + public void validate(String name, Long value) throws ParameterException { + ValueValidationUtils.minValueCheck(name, value, 0L); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveIntegerValueValidator.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveIntegerValueValidator.java new file mode 100644 index 0000000000000..d3d44d60cb889 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveIntegerValueValidator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; +import org.apache.pulsar.admin.cli.utils.ValueValidationUtils; + +public class PositiveIntegerValueValidator implements IValueValidator { + + @Override + public void validate(String name, Integer value) throws ParameterException { + ValueValidationUtils.positiveCheck(name, value); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveLongValueValidator.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveLongValueValidator.java new file mode 100644 index 0000000000000..d95d41b2c0762 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/PositiveLongValueValidator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; +import org.apache.pulsar.admin.cli.utils.ValueValidationUtils; + +public class PositiveLongValueValidator implements IValueValidator { + + @Override + public void validate(String name, Long value) throws ParameterException { + ValueValidationUtils.positiveCheck(name, value); + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/package-info.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/package-info.java new file mode 100644 index 0000000000000..116c7d1b89b48 --- /dev/null +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/validators/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils.validators;