From 54a8ca82801e2bcb4b0a2b0c72792e8ef1cd459e Mon Sep 17 00:00:00 2001
From: rdhabalia
Date: Wed, 1 Feb 2017 11:55:10 -0800
Subject: [PATCH 1/5] Update Broker service configuration dynamically
---
.../pulsar/broker/ServiceConfiguration.java | 1 +
.../common/configuration/FieldContext.java | 7 ++
.../yahoo/pulsar/broker/admin/Brokers.java | 99 ++++++++++++++++++-
.../pulsar/broker/service/BrokerService.java | 91 ++++++++++++++++-
.../pulsar/broker/admin/AdminApiTest.java | 56 +++++++++++
.../yahoo/pulsar/client/admin/Brokers.java | 9 ++
.../client/admin/internal/BrokersImpl.java | 12 +++
.../yahoo/pulsar/admin/cli/CmdBrokers.java | 14 +++
.../yahoo/pulsar/common/util/FieldParser.java | 2 +-
9 files changed, 285 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
index ab3099362e5d5..87553b3fe13a8 100644
--- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
@@ -62,6 +62,7 @@ public class ServiceConfiguration implements PulsarConfiguration{
private long zooKeeperSessionTimeoutMillis = 30000;
// Time to wait for broker graceful shutdown. After this time elapses, the
// process will be killed
+ @FieldContext(dynamic = true)
private long brokerShutdownTimeoutMs = 3000;
// Enable backlog quota check. Enforces action on topic when the quota is
// reached
diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
index cf693239b5484..465d35cfa2784 100644
--- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
+++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
@@ -56,4 +56,11 @@
* @return character length of field
*/
public int maxCharLength() default Integer.MAX_VALUE;
+
+ /**
+ * allow field to be updated dynamically
+ *
+ * @return
+ */
+ public boolean dynamic() default false;
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
index bce449bddafba..5a80d2298dac9 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
@@ -15,32 +15,48 @@
*/
package com.yahoo.pulsar.broker.admin;
+import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
+
+import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+import com.yahoo.pulsar.broker.ServiceConfiguration;
+import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import com.yahoo.pulsar.broker.web.RestException;
+import com.yahoo.pulsar.common.configuration.FieldContext;
+import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
-import com.yahoo.pulsar.broker.web.RestException;
-import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
+
@Path("/brokers")
@Api(value = "/brokers", description = "Brokers admin apis", tags = "brokers")
@Produces(MediaType.APPLICATION_JSON)
public class Brokers extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(Brokers.class);
-
+ private int serviceConfigZkVersion = -1;
+
@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set")
@@ -79,4 +95,79 @@ public Map getOwnedNamespaes(@PathParam("clust
throw new RestException(e);
}
}
+
+ @POST
+ @Path("/configuration/{configName}/{configValue}")
+ @ApiOperation(value = "Update broker service configuration. This operation requires Pulsar super-user privileges.")
+ @ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
+ @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
+ @ApiResponse(code = 404, message = "Configuration not found"),
+ @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
+ public void updateConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+ validateSuperUserAccess();
+ updateServiecConfiguration(configName, configValue);
+ }
+
+ /**
+ * if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
+ * all other brokers can see the change and take appropriate action on the change.
+ *
+ * @param configName
+ * : configuration key
+ * @param configValue
+ * : configuration value
+ */
+ private void updateServiecConfiguration(String configName, String configValue) {
+ try {
+ Field field = ServiceConfiguration.class.getDeclaredField(configName);
+ if (field != null && field.isAnnotationPresent(FieldContext.class)) {
+ field.setAccessible(true);
+ boolean dynamic = ((FieldContext) field.getAnnotation(FieldContext.class)).dynamic();
+ if (dynamic) {
+ updateConfigurationOnZk(configName, configValue);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
+ configValue);
+ }
+ throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Configuration {}/{} is not dynamic", clientAppId(), configName, configValue);
+ }
+ throw new RestException(Status.NOT_FOUND, "Configuration not found");
+ }
+ } catch (NoSuchFieldException nse) {
+ LOG.error("[{}] Configuration {}/{} not found", clientAppId(), configName, configValue);
+ throw new RestException(Status.NOT_FOUND, "Configuration not found");
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception ie) {
+ LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue,
+ ie.getMessage(), ie);
+ throw new RestException(ie);
+ }
+ }
+
+ private synchronized void updateConfigurationOnZk(String key, String value) throws Exception {
+ ZooKeeperDataCache
+ * On notification, listener should first check if config value has been changed and after taking appropriate
+ * action, listener should update config value with new value if it has been changed (so, next time listener can
+ * compare values on configMap change).
+ *
+ * @param configKey
+ * : configuration field name
+ * @param listener
+ * : listener which takes appropriate action on config-value change
+ */
+ public void registerConfigurationListener(String configKey, Consumer listener) {
+ dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() {
+ @Override
+ public void onUpdate(String path, Map data, Stat stat) {
+ if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null
+ && data.containsKey(configKey)) {
+ log.info("Updating configuration {}/{}", configKey, data.get(configKey));
+ listener.accept(data.get(configKey));
+ }
+ }
+ });
+ }
+
+ private void updateDynamicServiceConfiguration() {
+ try {
+ Optional> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
+ if (data.isPresent() && data.get() != null) {
+ data.get().forEach((key,value)-> {
+ try {
+ Field field = ServiceConfiguration.class.getDeclaredField(key);
+ if (field != null && field.isAnnotationPresent(FieldContext.class)) {
+ field.setAccessible(true);
+ field.set(pulsar().getConfiguration(), FieldParser.value(value,field));
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully updated {}/{}", key, value);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to update service configuration {}/{}, {}",key,value,e.getMessage());
+ }
+ });
+ }
+ } catch (Exception e) {
+ log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
index f909dcc21d6f3..8572f8a1b3e76 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
@@ -15,12 +15,14 @@
*/
package com.yahoo.pulsar.broker.admin;
+import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -36,6 +38,9 @@
import javax.ws.rs.client.WebTarget;
import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -56,6 +61,7 @@
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData;
import com.yahoo.pulsar.broker.namespace.NamespaceService;
+import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -377,6 +383,56 @@ public void brokers() throws Exception {
admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+
+ }
+
+ @Test
+ public void testDynamicConfiguration() throws Exception {
+ // create configuration znode
+ ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ // Now, znode is created: set the watch and listener on the znode
+ Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
+ getPermitZkNodeMethod.setAccessible(true);
+ getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
+
+ // (1) try to update dynamic field
+ final long shutdownTime = 10;
+ // update configuration
+ admin.brokers().updateConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+ // verify value is updated
+ assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+
+ // (2) try to update non-dynamic field
+ try {
+ admin.brokers().updateConfiguration("zookeeperServers", "test-zk:1234");
+ } catch (Exception e) {
+ assertTrue(e instanceof PreconditionFailedException);
+ }
+
+ // (3) try to update non-existent field
+ try {
+ admin.brokers().updateConfiguration("test", Long.toString(shutdownTime));
+ } catch (Exception e) {
+ assertTrue(e instanceof NotFoundException);
+ }
+
+ }
+
+ @Test
+ public void testUpdateDynamicLocalConfiguration() throws Exception {
+
+ // (1) try to update dynamic field
+ final long shutdownTime = 10;
+ // update configuration
+ admin.brokers().updateConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+ // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
+ Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
+ getPermitZkNodeMethod.setAccessible(true);
+ getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
+ // verify value is updated
+ assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+
}
@Test(enabled = true)
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
index 4e07b61ff97b4..9d6ad4cc7cd5c 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
@@ -67,4 +67,13 @@ public interface Brokers {
* @throws PulsarAdminException
*/
Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException;
+
+ /**
+ * Update value of any dynamic {@link ServiceConfiguration}
+ *
+ * @param key
+ * @param value
+ * @throws PulsarAdminException
+ */
+ void updateConfiguration(String configName, String configValue) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
index 3051a89c27e0c..5d32ac46e50a4 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
@@ -18,12 +18,14 @@
import java.util.List;
import java.util.Map;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import com.yahoo.pulsar.client.admin.Brokers;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.api.Authentication;
+import com.yahoo.pulsar.common.policies.data.ErrorData;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
public class BrokersImpl extends BaseResource implements Brokers {
@@ -56,4 +58,14 @@ public Map getOwnedNamespaces(String cluster,
}
}
+ @Override
+ public void updateConfiguration(String configName, String configValue) throws PulsarAdminException {
+ try {
+ request(brokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""),
+ ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
}
diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
index 82890d7ce3389..7658c43da807d 100644
--- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
@@ -48,9 +48,23 @@ void run() throws Exception {
}
}
+ @Parameters(commandDescription = "Update dynamic-serviceConfiguration of broker")
+ private class UpdateConfigurationCmd extends CliCommand {
+ @Parameter(names = "--config", description = "service-configuration name", required = true)
+ private String configName;
+ @Parameter(names = "--value", description = "service-configuration value", required = true)
+ private String configValue;
+
+ @Override
+ void run() throws Exception {
+ admin.brokers().updateConfiguration(configName, configValue);
+ }
+ }
+
CmdBrokers(PulsarAdmin admin) {
super("brokers", admin);
jcommander.addCommand("list", new List());
jcommander.addCommand("namespaces", new Namespaces());
+ jcommander.addCommand("updateConfig", new UpdateConfigurationCmd());
}
}
diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
index a0bc1d1f8a066..e1ddf8e02a3e4 100644
--- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
@@ -135,7 +135,7 @@ public static void update(Map properties, T obj) throws Ille
* : field of the attribute
* @return
*/
- private static Object value(String strValue, Field field) {
+ public static Object value(String strValue, Field field) {
checkNotNull(field);
// if field is not primitive type
if (field.getGenericType() instanceof ParameterizedType) {
From a194aa9cff3dccf5372c0cfeb2d09286e0bc0fe8 Mon Sep 17 00:00:00 2001
From: rdhabalia
Date: Mon, 20 Feb 2017 17:29:21 -0800
Subject: [PATCH 2/5] add configurationMap to avoid reflection everytime
---
.../yahoo/pulsar/broker/admin/Brokers.java | 31 ++++----------
.../pulsar/broker/service/BrokerService.java | 41 +++++++++++++------
.../yahoo/pulsar/admin/cli/CmdBrokers.java | 2 +-
3 files changed, 38 insertions(+), 36 deletions(-)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
index 5a80d2298dac9..dd679f87e774f 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
@@ -17,7 +17,6 @@
import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
-import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
@@ -38,8 +37,8 @@
import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.web.RestException;
-import com.yahoo.pulsar.common.configuration.FieldContext;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
@@ -105,7 +104,7 @@ public Map getOwnedNamespaes(@PathParam("clust
@ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
public void updateConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
validateSuperUserAccess();
- updateServiecConfiguration(configName, configValue);
+ updateServiceConfiguration(configName, configValue);
}
/**
@@ -117,30 +116,17 @@ public void updateConfiguration(@PathParam("configName") String configName, @Pat
* @param configValue
* : configuration value
*/
- private void updateServiecConfiguration(String configName, String configValue) {
+ private void updateServiceConfiguration(String configName, String configValue) {
try {
- Field field = ServiceConfiguration.class.getDeclaredField(configName);
- if (field != null && field.isAnnotationPresent(FieldContext.class)) {
- field.setAccessible(true);
- boolean dynamic = ((FieldContext) field.getAnnotation(FieldContext.class)).dynamic();
- if (dynamic) {
- updateConfigurationOnZk(configName, configValue);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
- configValue);
- }
- throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
- }
+ if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) {
+ updateConfigurationOnZk(configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Configuration {}/{} is not dynamic", clientAppId(), configName, configValue);
+ LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
+ configValue);
}
- throw new RestException(Status.NOT_FOUND, "Configuration not found");
+ throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
- } catch (NoSuchFieldException nse) {
- LOG.error("[{}] Configuration {}/{} not found", clientAppId(), configName, configValue);
- throw new RestException(Status.NOT_FOUND, "Configuration not found");
} catch (RestException re) {
throw re;
} catch (Exception ie) {
@@ -170,4 +156,5 @@ private synchronized void updateConfigurationOnZk(String key, String value) thro
}
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), key, value);
}
+
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
index 2a5136ae90e8d..45458b02237b6 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
@@ -83,6 +83,7 @@
import com.yahoo.pulsar.common.util.FieldParser;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
@@ -121,6 +122,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener offlineTopicStatCache;
+ private static final ConcurrentOpenHashMap dynamicConfigurationMap = prepareDynamicConfigurationMap();
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
@@ -847,19 +849,15 @@ public ZooKeeperDataCache> getDynamicConfigurationCache() {
}
/**
- * Update dynamic-ServiceConfiguration with value present into zk-configuraiton-map and register listeners on
- * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuraiton-map.
+ * Update dynamic-ServiceConfiguration with value present into zk-configuration-map and register listeners on
+ * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuration-map.
*/
private void updateConfigurationAndRegisterListeners() {
// update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();
// update brokerShutdownTimeoutMs value on listener notification
- registerConfigurationListener("brokerShutdownTimeoutMs", new Consumer() {
- @Override
- public void accept(String shutdownTime) {
- pulsar.getConfiguration().setBrokerShutdownTimeoutMs(Integer.parseInt(shutdownTime));
- }
- });
+ registerConfigurationListener("brokerShutdownTimeoutMs",
+ (shutdownTime) -> pulsar.getConfiguration().setBrokerShutdownTimeoutMs((long) shutdownTime));
//add more listeners here
}
@@ -870,20 +868,22 @@ public void accept(String shutdownTime) {
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
+ * @param
*
* @param configKey
* : configuration field name
* @param listener
* : listener which takes appropriate action on config-value change
*/
- public void registerConfigurationListener(String configKey, Consumer listener) {
+ public void registerConfigurationListener(String configKey, Consumer listener) {
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() {
+ @SuppressWarnings("unchecked")
@Override
public void onUpdate(String path, Map data, Stat stat) {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null
&& data.containsKey(configKey)) {
log.info("Updating configuration {}/{}", configKey, data.get(configKey));
- listener.accept(data.get(configKey));
+ listener.accept((T) FieldParser.value(data.get(configKey), dynamicConfigurationMap.get(configKey)));
}
}
});
@@ -899,9 +899,7 @@ private void updateDynamicServiceConfiguration() {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
field.set(pulsar().getConfiguration(), FieldParser.value(value,field));
- if (log.isDebugEnabled()) {
- log.debug("Successfully updated {}/{}", key, value);
- }
+ log.info("Successfully updated {}/{}", key, value);
}
} catch (Exception e) {
log.warn("Failed to update service configuration {}/{}, {}",key,value,e.getMessage());
@@ -912,5 +910,22 @@ private void updateDynamicServiceConfiguration() {
log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
}
}
+
+ public static ConcurrentOpenHashMap getDynamicConfigurationMap() {
+ return dynamicConfigurationMap;
+ }
+
+ private static ConcurrentOpenHashMap prepareDynamicConfigurationMap() {
+ ConcurrentOpenHashMap dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
+ for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
+ if (field != null && field.isAnnotationPresent(FieldContext.class)) {
+ field.setAccessible(true);
+ if (((FieldContext) field.getAnnotation(FieldContext.class)).dynamic()) {
+ dynamicConfigurationMap.put(field.getName(), field);
+ }
+ }
+ }
+ return dynamicConfigurationMap;
+ }
}
diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
index 7658c43da807d..27f0f24c25681 100644
--- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
@@ -65,6 +65,6 @@ void run() throws Exception {
super("brokers", admin);
jcommander.addCommand("list", new List());
jcommander.addCommand("namespaces", new Namespaces());
- jcommander.addCommand("updateConfig", new UpdateConfigurationCmd());
+ jcommander.addCommand("update-config", new UpdateConfigurationCmd());
}
}
From cc3c92f66eb3dcfc18259d6bdc3e9b9f321a9a74 Mon Sep 17 00:00:00 2001
From: rdhabalia
Date: Mon, 20 Feb 2017 18:20:09 -0800
Subject: [PATCH 3/5] Trigger listener on value change and update ServiceConfig
value after that
---
.../pulsar/broker/service/BrokerService.java | 36 +++++++++++++++++--
.../pulsar/broker/admin/AdminApiTest.java | 2 +-
2 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
index 45458b02237b6..aa10c9331aead 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
@@ -123,6 +123,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener offlineTopicStatCache;
private static final ConcurrentOpenHashMap dynamicConfigurationMap = prepareDynamicConfigurationMap();
+ private final ConcurrentOpenHashMap configRegisteredListeners;
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
@@ -152,6 +153,7 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
+ this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
this.pulsarStats = new PulsarStats(pulsar);
@@ -855,9 +857,6 @@ public ZooKeeperDataCache> getDynamicConfigurationCache() {
private void updateConfigurationAndRegisterListeners() {
// update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();
- // update brokerShutdownTimeoutMs value on listener notification
- registerConfigurationListener("brokerShutdownTimeoutMs",
- (shutdownTime) -> pulsar.getConfiguration().setBrokerShutdownTimeoutMs((long) shutdownTime));
//add more listeners here
}
@@ -876,6 +875,7 @@ private void updateConfigurationAndRegisterListeners() {
* : listener which takes appropriate action on config-value change
*/
public void registerConfigurationListener(String configKey, Consumer listener) {
+ configRegisteredListeners.put(configKey, listener);
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() {
@SuppressWarnings("unchecked")
@Override
@@ -906,6 +906,36 @@ private void updateDynamicServiceConfiguration() {
}
});
}
+ // register a listener: it updates field value and triggers appropriate registered field-listener only if
+ // field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
+ dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onUpdate(String path, Map data, Stat stat) {
+ if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
+ data.forEach((configKey, value) -> {
+ Field configField = dynamicConfigurationMap.get(configKey);
+ Object newValue = FieldParser.value(data.get(configKey), configField);
+ if (configField != null) {
+ Consumer listener = configRegisteredListeners.get(configKey);
+ try {
+ Object existingValue = configField.get(pulsar.getConfiguration());
+ configField.set(pulsar.getConfiguration(), newValue);
+ log.info("Successfully updated configuration {}/{}", configKey,
+ data.get(configKey));
+ if (listener != null && !existingValue.equals(newValue)) {
+ listener.accept(newValue);
+ }
+ } catch (Exception e) {
+ log.error("Failed to update config {}/{}", configKey, newValue);
+ }
+ } else {
+ log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
+ }
+ });
+ }
+ }
+ });
} catch (Exception e) {
log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
index 8572f8a1b3e76..e87c8d6b632ee 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
@@ -414,7 +414,7 @@ public void testDynamicConfiguration() throws Exception {
try {
admin.brokers().updateConfiguration("test", Long.toString(shutdownTime));
} catch (Exception e) {
- assertTrue(e instanceof NotFoundException);
+ assertTrue(e instanceof PreconditionFailedException);
}
}
From bd5ce2c2530b9ab94049aa875437806d50a3b268 Mon Sep 17 00:00:00 2001
From: rdhabalia
Date: Thu, 2 Mar 2017 15:14:32 -0800
Subject: [PATCH 4/5] add getAllConfiguration adminApiCommand + add test-cases
---
.../yahoo/pulsar/broker/admin/Brokers.java | 79 +++++++++++-------
.../pulsar/broker/admin/AdminApiTest.java | 81 ++++++++++++++++---
.../yahoo/pulsar/client/admin/Brokers.java | 27 ++++++-
.../client/admin/internal/BrokersImpl.java | 22 ++++-
.../yahoo/pulsar/admin/cli/CmdBrokers.java | 24 +++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 9 +++
6 files changed, 197 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
index dd679f87e774f..3fea37d54bc4b 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
@@ -17,6 +17,7 @@
import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,29 +98,74 @@ public Map getOwnedNamespaes(@PathParam("clust
@POST
@Path("/configuration/{configName}/{configValue}")
- @ApiOperation(value = "Update broker service configuration. This operation requires Pulsar super-user privileges.")
+ @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
- public void updateConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+ public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
validateSuperUserAccess();
- updateServiceConfiguration(configName, configValue);
+ updateDynamicConfigurationOnZk(configName, configValue);
}
+ @GET
+ @Path("/configuration/values")
+ @ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
+ @ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
+ public Map getAllDynamicConfigurations() throws Exception {
+ ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService()
+ .getDynamicConfigurationCache();
+ Map configurationMap = null;
+ try {
+ configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
+ } catch (RestException e) {
+ LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
+ throw new RestException(e);
+ }
+ return configurationMap;
+ }
+
+ @GET
+ @Path("/configuration")
+ @ApiOperation(value = "Get all updatable dynamic configurations's name")
+ public List getDynamicConfigurationName() {
+ return BrokerService.getDynamicConfigurationMap().keys();
+ }
+
/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
- * all other brokers can see the change and take appropriate action on the change.
+ * all other brokers get the watch and can see the change and take appropriate action on the change.
*
* @param configName
* : configuration key
* @param configValue
* : configuration value
*/
- private void updateServiceConfiguration(String configName, String configValue) {
+ private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
try {
if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) {
- updateConfigurationOnZk(configName, configValue);
+ ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService()
+ .getDynamicConfigurationCache();
+ Map configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+ .orElse(null);
+ if (configurationMap != null) {
+ configurationMap.put(configName, configValue);
+ byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+ dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
+ serviceConfigZkVersion = localZk()
+ .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
+ } else {
+ configurationMap = Maps.newHashMap();
+ configurationMap.put(configName, configValue);
+ byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+ ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
@@ -136,25 +182,4 @@ private void updateServiceConfiguration(String configName, String configValue) {
}
}
- private synchronized void updateConfigurationOnZk(String key, String value) throws Exception {
- ZooKeeperDataCache> dynamicConfigurationCache = pulsar().getBrokerService()
- .getDynamicConfigurationCache();
- Map configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
- .orElse(null);
- if (configurationMap != null) {
- configurationMap.put(key, value);
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
- serviceConfigZkVersion = localZk()
- .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
- } else {
- configurationMap = Maps.newHashMap();
- configurationMap.put(key, value);
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), key, value);
- }
-
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
index e87c8d6b632ee..cadb2fa111449 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
@@ -17,6 +17,7 @@
import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -386,55 +387,113 @@ public void brokers() throws Exception {
}
+ /**
+ *
+ * Verifies: zk-update configuration updates service-config
+ * 1. create znode for dynamic-config
+ * 2. start pulsar service so, pulsar can set the watch on that znode
+ * 3. update the configuration with new value
+ * 4. wait and verify that new value has been updated
+ *
+ *
+ * @throws Exception
+ */
@Test
- public void testDynamicConfiguration() throws Exception {
+ public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
- Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
- getPermitZkNodeMethod.setAccessible(true);
- getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
-
+ Method updateConfigListenerMethod = BrokerService.class
+ .getDeclaredMethod("updateConfigurationAndRegisterListeners");
+ updateConfigListenerMethod.setAccessible(true);
+ updateConfigListenerMethod.invoke(pulsar.getBrokerService());
+ pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
// (1) try to update dynamic field
final long shutdownTime = 10;
// update configuration
- admin.brokers().updateConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+ admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+ // wait config to be updated
+ for (int i = 0; i < 5; i++) {
+ if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != shutdownTime) {
+ Thread.sleep(100 + (i * 10));
+ } else {
+ break;
+ }
+ }
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
// (2) try to update non-dynamic field
try {
- admin.brokers().updateConfiguration("zookeeperServers", "test-zk:1234");
+ admin.brokers().updateDynamicConfiguration("zookeeperServers", "test-zk:1234");
} catch (Exception e) {
assertTrue(e instanceof PreconditionFailedException);
}
// (3) try to update non-existent field
try {
- admin.brokers().updateConfiguration("test", Long.toString(shutdownTime));
+ admin.brokers().updateDynamicConfiguration("test", Long.toString(shutdownTime));
} catch (Exception e) {
assertTrue(e instanceof PreconditionFailedException);
}
}
-
+
+ /**
+ *
+ * verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
+ * NOTE: pulsar can't set the watch on non-existing znode
+ * So, when pulsar starts it is not able to set the watch on non-existing znode of dynamicConfiguration
+ * So, here, after creating znode we will trigger register explicitly
+ * 1.start pulsar
+ * 2.update zk-config with admin api
+ * 3. trigger watch and listener
+ * 4. verify that config is updated
+ *
+ * @throws Exception
+ */
@Test
public void testUpdateDynamicLocalConfiguration() throws Exception {
-
// (1) try to update dynamic field
final long shutdownTime = 10;
+ pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
// update configuration
- admin.brokers().updateConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+ admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
// Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
getPermitZkNodeMethod.setAccessible(true);
getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+ }
+ @Test
+ public void testUpdatableConfigurationName() throws Exception {
+ // (1) try to update dynamic field
+ final String configName = "brokerShutdownTimeoutMs";
+ assertTrue(admin.brokers().getDynamicConfigurationNames().contains(configName));
}
+ @Test
+ public void testGetDynamicLocalConfiguration() throws Exception {
+ // (1) try to update dynamic field
+ final String configName = "brokerShutdownTimeoutMs";
+ final long shutdownTime = 10;
+ pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
+ try {
+ admin.brokers().getAllDynamicConfigurations();
+ fail("should have fail as configuration is not exist");
+ } catch (PulsarAdminException.NotFoundException ne) {
+ // ok : expected
+ }
+ assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+ // update configuration
+ admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime));
+ // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
+ assertEquals(Long.parseLong(admin.brokers().getAllDynamicConfigurations().get(configName)), shutdownTime);
+ }
+
@Test(enabled = true)
public void properties() throws PulsarAdminException {
Set allowedClusters = Sets.newHashSet("use");
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
index 9d6ad4cc7cd5c..008f7e35334e0 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
@@ -69,11 +69,30 @@ public interface Brokers {
Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException;
/**
- * Update value of any dynamic {@link ServiceConfiguration}
+ * It updates dynamic configuration value in to Zk that triggers watch on
+ * brokers and all brokers can update {@link ServiceConfiguration} value
+ * locally
+ *
+ * @param key
+ * @param value
+ * @throws PulsarAdminException
+ */
+ void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException;
+
+ /**
+ * Get list of updatable configuration name
+ *
+ * @return
+ * @throws PulsarAdminException
+ */
+ List getDynamicConfigurationNames() throws PulsarAdminException;
+
+ /**
+ * Get values of all overridden dynamic-configs
*
- * @param key
- * @param value
+ * @return
* @throws PulsarAdminException
*/
- void updateConfiguration(String configName, String configValue) throws PulsarAdminException;
+ Map getAllDynamicConfigurations() throws PulsarAdminException;
+
}
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
index 5d32ac46e50a4..2829b8a1a2e8d 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
@@ -59,7 +59,7 @@ public Map getOwnedNamespaces(String cluster,
}
@Override
- public void updateConfiguration(String configName, String configValue) throws PulsarAdminException {
+ public void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException {
try {
request(brokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""),
ErrorData.class);
@@ -68,4 +68,24 @@ public void updateConfiguration(String configName, String configValue) throws Pu
}
}
+ @Override
+ public Map getAllDynamicConfigurations() throws PulsarAdminException {
+ try {
+ return request(brokers.path("/configuration/").path("values")).get(new GenericType>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public List getDynamicConfigurationNames() throws PulsarAdminException {
+ try {
+ return request(brokers.path("/configuration")).get(new GenericType>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
}
diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
index 27f0f24c25681..4eaa1d0688a05 100644
--- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
@@ -57,14 +57,34 @@ private class UpdateConfigurationCmd extends CliCommand {
@Override
void run() throws Exception {
- admin.brokers().updateConfiguration(configName, configValue);
+ admin.brokers().updateDynamicConfiguration(configName, configValue);
}
}
+ @Parameters(commandDescription = "Get all overridden dynamic-configuration values")
+ private class GetAllConfigurationsCmd extends CliCommand {
+
+ @Override
+ void run() throws Exception {
+ print(admin.brokers().getAllDynamicConfigurations());
+ }
+ }
+
+ @Parameters(commandDescription = "Get list of updatable configuration name")
+ private class GetUpdatableConfigCmd extends CliCommand {
+
+ @Override
+ void run() throws Exception {
+ print(admin.brokers().getDynamicConfigurationNames());
+ }
+ }
+
CmdBrokers(PulsarAdmin admin) {
super("brokers", admin);
jcommander.addCommand("list", new List());
jcommander.addCommand("namespaces", new Namespaces());
- jcommander.addCommand("update-config", new UpdateConfigurationCmd());
+ jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd());
+ jcommander.addCommand("list-dynamic-config", new GetUpdatableConfigCmd());
+ jcommander.addCommand("get-all-dynamic-config", new GetAllConfigurationsCmd());
}
}
diff --git a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
index 328da033d18ff..3dbf536737509 100644
--- a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -59,6 +59,15 @@ void brokers() throws Exception {
brokers.run(split("list use"));
verify(mockBrokers).getActiveBrokers("use");
+
+ brokers.run(split("get-all-dynamic-config"));
+ verify(mockBrokers).getAllDynamicConfigurations();
+
+ brokers.run(split("list-dynamic-config"));
+ verify(mockBrokers).getDynamicConfigurationNames();
+
+ brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
+ verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");
}
@Test
From 26c72f3c4d8bcb331ed015ce84288a3ea99186f1 Mon Sep 17 00:00:00 2001
From: rdhabalia
Date: Fri, 3 Mar 2017 15:40:13 -0800
Subject: [PATCH 5/5] add dynamic-configuration command documentation
---
docs/AdminTools.md | 154 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 154 insertions(+)
diff --git a/docs/AdminTools.md b/docs/AdminTools.md
index 0c175887d7e47..66690fe6dd366 100644
--- a/docs/AdminTools.md
+++ b/docs/AdminTools.md
@@ -14,6 +14,9 @@
- [Brokers](#brokers)
- [list of active brokers](#list-of-active-brokers)
- [list of namespaces owned by a given broker](#list-of-namespaces-owned-by-a-given-broker)
+ - [update dynamic configuration](#update-dynamic-configuration)
+ - [get list of dynamic configuration name](#get-list-of-dynamic-configuration-name)
+ - [get value of dynamic configurations](#get-value-of-dynamic-configurations)
- [Properties](#properties)
- [list existing properties](#list-existing-properties)
- [create property](#create-property)
@@ -233,6 +236,157 @@ GET /admin/brokers/{cluster}/{broker}/ownedNamespaces
admin.brokers().getOwnedNamespaces(cluster,brokerUrl)
```
+#### update dynamic configuration
+Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.
+
+###### CLI
+
+```
+$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
+```
+
+```
+N/A
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/{configName}/{configValue}
+```
+
+###### Java
+
+```java
+admin.brokers().updateDynamicConfiguration(configName, configValue)
+```
+
+#### get list of dynamic configuration name
+It gives list of updatable dynamic service-configuration name.
+
+###### CLI
+
+```
+$ pulsar-admin brokers list-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration
+```
+
+###### Java
+
+```java
+admin.brokers().getDynamicConfigurationNames()
+```
+
+#### get value of dynamic configurations
+It gives value of all dynamic configurations stored in zookeeper
+
+###### CLI
+
+```
+$ pulsar-admin brokers get-all-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs:100
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/values
+```
+
+###### Java
+
+```java
+admin.brokers().getAllDynamicConfigurations()
+```
+
+#### Update dynamic configuration
+Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.
+
+###### CLI
+
+```
+$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
+```
+
+```
+N/A
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/{configName}/{configValue}
+```
+
+###### Java
+
+```java
+admin.brokers().updateDynamicConfiguration(configName, configValue)
+```
+
+#### Get list of dynamic configuration name
+It gives list of updatable dynamic service-configuration name.
+
+###### CLI
+
+```
+$ pulsar-admin brokers list-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration
+```
+
+###### Java
+
+```java
+admin.brokers().getDynamicConfigurationNames()
+```
+
+#### Get value of dynamic configurations
+It gives value of all dynamic configurations stored in zookeeper
+
+###### CLI
+
+```
+$ pulsar-admin brokers get-all-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs:100
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/values
+```
+
+###### Java
+
+```java
+admin.brokers().getAllDynamicConfigurations()
+```
+
+
### Properties