From 952e447bd0e6a1ed46a35b21b16c1e5b8da3c458 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 2 Feb 2017 15:49:01 -0800 Subject: [PATCH] Support multiple zookeeper quorum to store cluster-management-configuration and ledger-metadata separately --- bin/pulsar | 9 + conf/broker.conf | 6 + conf/data_zookeeper.conf | 42 +++ conf/standalone.conf | 6 + docs/Architecture.md | 1 + docs/ClusterSetup.md | 27 ++ .../pulsar/broker/ServiceConfiguration.java | 26 ++ .../pulsar/PulsarClusterMetadataSetup.java | 17 +- .../broker/MessagingServiceShutdownHook.java | 7 +- .../yahoo/pulsar/broker/PulsarService.java | 54 +++- .../pulsar/broker/admin/AdminResource.java | 4 +- .../cache/LocalZooKeeperCacheService.java | 2 +- .../loadbalance/LeaderElectionService.java | 2 +- .../impl/SimpleLoadManagerImpl.java | 14 +- .../broker/namespace/NamespaceService.java | 2 +- .../pulsar/broker/service/BrokerService.java | 2 +- .../yahoo/pulsar/broker/admin/AdminTest.java | 52 ++-- .../pulsar/broker/admin/NamespacesTest.java | 36 +-- .../auth/BrokerDataAndServiceZKTest.java | 272 ++++++++++++++++++ .../auth/MockedPulsarServiceBaseTest.java | 37 ++- ...sistentDispatcherFailoverConsumerTest.java | 2 +- .../PersistentTopicConcurrentTest.java | 2 +- .../broker/service/PersistentTopicTest.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../pulsar/broker/web/WebServiceTest.java | 5 +- .../ZooKeeperSessionExpireRecoveryTest.java | 2 +- .../client/api/BrokerServiceLookupTest.java | 31 +- .../service/web/DiscoveryServiceWebTest.java | 2 +- .../proxy/ProxyAuthenticationTest.java | 2 +- .../proxy/ProxyAuthorizationTest.java | 2 +- .../proxy/ProxyPublishConsumeTest.java | 2 +- .../proxy/ProxyPublishConsumeTls.java | 2 +- .../ProxyPublishConsumeWithoutZKTest.java | 2 +- 33 files changed, 576 insertions(+), 100 deletions(-) create mode 100644 conf/data_zookeeper.conf create mode 100644 pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/BrokerDataAndServiceZKTest.java diff --git a/bin/pulsar b/bin/pulsar index 884c44fdf6765..06bf2ea156c95 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -21,6 +21,7 @@ PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf +DEFAULT_DATA_ZK_CONF=$PULSAR_HOME/conf/data_zookeeper.conf DEFAULT_GLOBAL_ZK_CONF=$PULSAR_HOME/conf/global_zookeeper.conf DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf @@ -83,6 +84,7 @@ Environment variables: PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF) PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF) PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF) + PULSAR_DATA_ZK_CONF Configuration file for data zookeeper (default: $DEFAULT_DATA_ZK_CONF) PULSAR_GLOBAL_ZK_CONF Configuration file for global zookeeper (default: $DEFAULT_GLOBAL_ZK_CONF) PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF) PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF) @@ -141,6 +143,10 @@ if [ -z "$PULSAR_ZK_CONF" ]; then PULSAR_ZK_CONF=$DEFAULT_ZK_CONF fi +if [ -z "$PULSAR_DATA_ZK_CONF" ]; then + PULSAR_DATA_ZK_CONF=$DEFAULT_DATA_ZK_CONF +fi + if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF fi @@ -193,6 +199,9 @@ elif [ $COMMAND == "bookie" ]; then elif [ $COMMAND == "zookeeper" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_ZK_CONF $@ +elif [ $COMMAND == "data-zookeeper" ]; then + PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"data-zookeeper.log"} + exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_DATA_ZK_CONF $@ elif [ $COMMAND == "global-zookeeper" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"} # Allow global ZK to turn into read-only mode when it cannot reach the quorum diff --git a/conf/broker.conf b/conf/broker.conf index 73a4402da8b86..a3bd7797bdb54 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -19,6 +19,9 @@ # Zookeeper quorum connection string zookeeperServers= +# Data Zookeeper quorum connection string +dataZookeeperServers= + # Global Zookeeper quorum connection string globalZookeeperServers= @@ -49,6 +52,9 @@ clusterName= # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 +# Data Zookeeper session timeout in milliseconds +dataZooKeeperSessionTimeoutMillis=60000 + # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=3000 diff --git a/conf/data_zookeeper.conf b/conf/data_zookeeper.conf new file mode 100644 index 0000000000000..3477c2c3c642c --- /dev/null +++ b/conf/data_zookeeper.conf @@ -0,0 +1,42 @@ +# +# Copyright 2016 Yahoo Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The number of milliseconds of each tick +tickTime=2000 +# The number of ticks that the initial +# synchronization phase can take +initLimit=10 +# The number of ticks that can pass between +# sending a request and getting an acknowledgement +syncLimit=5 +# the directory where the snapshot is stored. +dataDir=data/zookeeper +# the port at which the clients will connect +clientPort=2181 +# the maximum number of client connections. +# increase this if you need to handle more clients +#maxClientCnxns=60 +# +# Be sure to read the maintenance section of the +# administrator guide before turning on autopurge. +# +# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance +# +# The number of snapshots to retain in dataDir +autopurge.snapRetainCount=3 +# Purge task interval in hours +# Set to "0" to disable auto purge feature +autopurge.purgeInterval=1 diff --git a/conf/standalone.conf b/conf/standalone.conf index c7efc28cbab96..afa8dd24e1a08 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -19,6 +19,9 @@ # Zookeeper quorum connection string zookeeperServers= +# Data Zookeeper quorum connection string +dataZookeeperServers= + # Global Zookeeper quorum connection string globalZookeeperServers= @@ -39,6 +42,9 @@ clusterName=standalone # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 +# Data Zookeeper session timeout in milliseconds +dataZooKeeperSessionTimeoutMillis=60000 + # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=3000 diff --git a/docs/Architecture.md b/docs/Architecture.md index 02af179130738..e981376f3fb3c 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -71,6 +71,7 @@ Internally, a single managed ledger uses multiple Bookkeeper ledgers to store th Pulsar uses Apache Zookeeper for metadata, cluster configuration and coordination. - *Global Zookeeper* stores user provisioning data like properties, namespaces and policies which should be global consistent. - Each cluster has a *local zookeeper* ensemble which stores cluster specific configuration and coordination data, like ownership metadata, broker load reports, bookkeeper ledgers' metadata. +- *local zookeeper* can be highly available by configuring separete *data zookeeper* which stores bookkeeper ledgers' metadata and *local zookeeper* needs to store only cluster management configuration. ## Design diff --git a/docs/ClusterSetup.md b/docs/ClusterSetup.md index 122b3cff39eb8..1af40fbac8f42 100644 --- a/docs/ClusterSetup.md +++ b/docs/ClusterSetup.md @@ -46,6 +46,32 @@ Start ZK service on all the hosts: ```shell $ bin/pulsar-daemon start zookeeper ``` +#### Data ZooKeeper + +Start dedicated zookeeper quorum to store bookkeeper ledgers' metadata. + +##### Single zookeeper quorum +This does not require to configure and start _data_ zookeeper quorum, and pulsar instance uses _local_ zk quorum to store both cluster-management configuration and bookkeeper ledgers' metadata. + +##### Dedicated data-zookeeper quorum +In this scenario dedicated _data_ zk stores bookkeeper ledgers' metadata and _local_ zk stores only cluster-management configuration. Using dedicated _data_ zk makes _local_ zk and broker highly available because broker's availability only depends on cluster-management configuration that is stored into _local_ zk. + +Add all ZK servers the quorum configuration. Edit `conf/data_zookeeper.conf` and add +the following lines in all the ZK servers: + +``` +server.1=zk4.us-west.example.com:2888:3888 +server.2=zk5.us-west.example.com:2888:3888 +server.3=zk6.us-west.example.com:2888:3888 +... +``` + +Start ZK service on all the hosts: + +```shell +$ bin/pulsar-daemon start data-zookeeper +``` + #### Global ZooKeeper @@ -135,6 +161,7 @@ as well as the Pulsar metadata. ```shell $ bin/pulsar initialize-cluster-metadata --cluster us-west \ --zookeeper zk1.us-west.example.com:2181 \ + --data-zookeeper zk1.us-west.example.com:2181 \ --global-zookeeper zk1.us-west.example.com:2184 \ --service-url http://pulsar.us-west.example.com:8080/ \ --service-url-tls https://pulsar.us-west.example.com:8443/ diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index ab3099362e5d5..721b9029484c2 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -36,6 +36,9 @@ public class ServiceConfiguration implements PulsarConfiguration{ // Zookeeper quorum connection string @FieldContext(required = true) private String zookeeperServers; + // Data Zookeeper quorum connection string + @FieldContext(required = false) + private String dataZookeeperServers; // Global Zookeeper quorum connection string @FieldContext(required = false) private String globalZookeeperServers; @@ -60,6 +63,8 @@ public class ServiceConfiguration implements PulsarConfiguration{ private String clusterName; // Zookeeper session timeout in milliseconds private long zooKeeperSessionTimeoutMillis = 30000; + // Data Zookeeper session timeout in milliseconds + private long dataZooKeeperSessionTimeoutMillis = 60000; // Time to wait for broker graceful shutdown. After this time elapses, the // process will be killed private long brokerShutdownTimeoutMs = 3000; @@ -249,6 +254,19 @@ public void setZookeeperServers(String zookeeperServers) { this.zookeeperServers = zookeeperServers; } + public String getDataZookeeperServers() { + if (this.dataZookeeperServers == null || this.dataZookeeperServers.isEmpty()) { + // If the configuration is not set, assuming that the dataZK is not enabled and all data is in the same + // ZooKeeper cluster + return this.getZookeeperServers(); + } + return dataZookeeperServers; + } + + public void setDataZookeeperServers(String dataZookeeperServers) { + this.dataZookeeperServers = dataZookeeperServers; + } + public String getGlobalZookeeperServers() { if (this.globalZookeeperServers == null || this.globalZookeeperServers.isEmpty()) { // If the configuration is not set, assuming that the globalZK is not enabled and all data is in the same @@ -906,6 +924,14 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis) this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis; } + public long getDataZooKeeperSessionTimeoutMillis() { + return dataZooKeeperSessionTimeoutMillis; + } + + public void setDataZooKeeperSessionTimeoutMillis(long dataZooKeeperSessionTimeoutMillis) { + this.dataZooKeeperSessionTimeoutMillis = dataZooKeeperSessionTimeoutMillis; + } + public String getReplicatorPrefix() { return replicatorPrefix; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/PulsarClusterMetadataSetup.java index 6730cfe9c55bb..1c381c83fa66e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/PulsarClusterMetadataSetup.java @@ -21,6 +21,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; import org.apache.bookkeeper.util.ZkUtils; +import static org.apache.commons.lang3.StringUtils.isBlank; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; @@ -54,6 +55,10 @@ private static class Arguments { @Parameter(names = { "-zk", "--zookeeper" }, description = "Local ZooKeeper quorum connection string", required = true) private String zookeeper; + + @Parameter(names = { "-dzk", + "--data-zookeeper" }, description = "Data ZooKeeper quorum connection string (Optional if data-zookeeper is not configured", required = false) + private String dataZookeeper; @Parameter(names = { "-gzk", "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = true) @@ -78,22 +83,26 @@ public static void main(String[] args) throws Exception { return; } - log.info("Setting up cluster {} with zk={} global-zk={}", arguments.cluster, arguments.zookeeper, - arguments.globalZookeeper); + arguments.dataZookeeper = isBlank(arguments.dataZookeeper) ? arguments.zookeeper : arguments.dataZookeeper; + log.info("Setting up cluster {} with zk={} dzk={} global-zk={}", arguments.cluster, arguments.zookeeper, + arguments.dataZookeeper, arguments.globalZookeeper); // Format BookKeeper metadata ClientConfiguration bkConf = new ClientConfiguration(); bkConf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class); - bkConf.setZkServers(arguments.zookeeper); + bkConf.setZkServers(arguments.dataZookeeper); if (!BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) { throw new IOException("Failed to initialize BookKeeper metadata"); } ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl(); ZooKeeper localZk = zkfactory.create(arguments.zookeeper, SessionType.ReadWrite, 30000).get(); + // use localZk for dataZookeeper if dataZk is not provided or it is same as localZk + ZooKeeper dataZk = arguments.zookeeper.equals(arguments.dataZookeeper) ? localZk + : zkfactory.create(arguments.dataZookeeper, SessionType.ReadWrite, 30000).get(); ZooKeeper globalZk = zkfactory.create(arguments.globalZookeeper, SessionType.ReadWrite, 30000).get(); - localZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + dataZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ZkUtils.createFullPathOptimistic(globalZk, "/admin/policies", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/MessagingServiceShutdownHook.java index bba8026276c09..ad8fde8024b0e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/MessagingServiceShutdownHook.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/MessagingServiceShutdownHook.java @@ -83,8 +83,11 @@ public void shutdown(int exitCode) { try { // Try to close ZK session to ensure all ephemeral locks gets released immediately if (service != null) { - if (service.getZkClient().getState() != States.CLOSED) { - service.getZkClient().close(); + if (service.getLocalZkClient().getState() != States.CLOSED) { + service.getLocalZkClient().close(); + } + if (service.getDataZkClient().getState() != States.CLOSED) { + service.getDataZkClient().close(); } } } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index 218239a9dbf3a..2e95c0a898bc5 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.yahoo.pulsar.broker.admin.AdminResource; import com.yahoo.pulsar.broker.cache.ConfigurationCacheService; @@ -82,10 +83,13 @@ public class PulsarService implements AutoCloseable { private WebService webService = null; private WebSocketService webSocketService = null; private ConfigurationCacheService configurationCacheService = null; + private LocalZooKeeperCacheService dataZkCacheService = null; + private ZooKeeperCache dataZkCache; + private LocalZooKeeperConnectionService dataZooKeeperConnectionProvider; private LocalZooKeeperCacheService localZkCacheService = null; private ZooKeeperCache localZkCache; - private GlobalZooKeeperCache globalZkCache; private LocalZooKeeperConnectionService localZooKeeperConnectionProvider; + private GlobalZooKeeperCache globalZkCache; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, new DefaultThreadFactory("pulsar")); private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered"); @@ -171,12 +175,17 @@ public void close() throws PulsarServerException { globalZkCache.close(); globalZkCache = null; localZooKeeperConnectionProvider.close(); + dataZooKeeperConnectionProvider.close(); localZooKeeperConnectionProvider = null; + dataZooKeeperConnectionProvider = null; } configurationCacheService = null; + localZkCacheService = null; localZkCache = null; + dataZkCacheService = null; + dataZkCache = null; if (adminClient != null) { adminClient.close(); @@ -222,14 +231,22 @@ public void start() throws PulsarServerException { } // Now we are ready to start services - localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), + localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getLocalZooKeeperClientFactory(), config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis()); localZooKeeperConnectionProvider.start(shutdownService); - + + if (equalsDataAndLocalZk()) { + dataZooKeeperConnectionProvider = localZooKeeperConnectionProvider; + } else { + dataZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), + config.getDataZookeeperServers(), config.getDataZooKeeperSessionTimeoutMillis()); + dataZooKeeperConnectionProvider.start(shutdownService); + } + // Initialize and start service to access configuration repository. this.startZkCacheService(); - managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), + managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getDataZkClient(), getBookKeeperClientFactory()); this.brokerService = new BrokerService(this); @@ -369,7 +386,8 @@ private void startZkCacheService() throws PulsarServerException { LOG.info("starting configuration cache service"); - this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor()); + this.localZkCache = new LocalZooKeeperCache(getLocalZkClient(), getOrderedExecutor()); + this.dataZkCache = equalsDataAndLocalZk() ? localZkCache : new LocalZooKeeperCache(getDataZkClient(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), getOrderedExecutor(), this.executor); @@ -381,6 +399,8 @@ private void startZkCacheService() throws PulsarServerException { this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache()); this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService); + this.dataZkCacheService = equalsDataAndLocalZk() ? localZkCacheService + : new LocalZooKeeperCacheService(getDataZkCache(), this.configurationCacheService); } private void startNamespaceService() throws PulsarServerException { @@ -460,13 +480,21 @@ public String getStatusFilePath() { return config.getStatusFilePath(); } - public ZooKeeper getZkClient() { + public ZooKeeper getDataZkClient() { + return this.dataZooKeeperConnectionProvider.getLocalZooKeeper(); + } + + public ZooKeeper getLocalZkClient() { return this.localZooKeeperConnectionProvider.getLocalZooKeeper(); } public ConfigurationCacheService getConfigurationCache() { return configurationCacheService; } + + public boolean equalsDataAndLocalZk() { + return config.getZookeeperServers().equals(config.getDataZookeeperServers()); + } /** * Get the current pulsar state. @@ -508,6 +536,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() { return managedLedgerClientFactory.getManagedLedgerFactory(); } + public ZooKeeperCache getDataZkCache() { + return dataZkCache; + } + public ZooKeeperCache getLocalZkCache() { return localZkCache; } @@ -528,10 +560,20 @@ public OrderedSafeExecutor getOrderedExecutor() { return orderedExecutor; } + public LocalZooKeeperCacheService getDataZkCacheService() { + return this.dataZkCacheService; + } + public LocalZooKeeperCacheService getLocalZkCacheService() { return this.localZkCacheService; } + // this is created to allow unit-test to mock it + @VisibleForTesting + public ZooKeeperClientFactory getLocalZooKeeperClientFactory() { + return getZooKeeperClientFactory(); + } + public ZooKeeperClientFactory getZooKeeperClientFactory() { if (zkClientFactory == null) { zkClientFactory = new ZookeeperClientFactoryImpl(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java index ecf6923c03d1c..b980159f22db0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java @@ -71,7 +71,7 @@ protected ZooKeeperCache globalZkCache() { } protected ZooKeeper localZk() { - return pulsar().getZkClient(); + return pulsar().getLocalZkClient(); } protected ZooKeeperCache localZkCache() { @@ -255,7 +255,7 @@ ZooKeeperDataCache clustersCache() { } ZooKeeperChildrenCache managedLedgerListCache() { - return pulsar().getLocalZkCacheService().managedLedgerListCache(); + return pulsar().getDataZkCacheService().managedLedgerListCache(); } Set clusters() { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/cache/LocalZooKeeperCacheService.java index 3672374760a17..c255c59614575 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -43,7 +43,7 @@ public class LocalZooKeeperCacheService { private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCacheService.class); - private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers"; + public static final String MANAGED_LEDGER_ROOT = "/managed-ledgers"; public static final String OWNER_INFO_ROOT = "/namespace"; public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies"; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LeaderElectionService.java index 4802403556d4e..66a7feb533dc7 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LeaderElectionService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LeaderElectionService.java @@ -71,7 +71,7 @@ public static interface LeaderListener { public LeaderElectionService(PulsarService pulsar, LeaderListener leaderListener) { this.pulsar = pulsar; - this.zkClient = pulsar.getZkClient(); + this.zkClient = pulsar.getLocalZkClient(); this.executor = pulsar.getExecutor(); this.leaderListener = leaderListener; this.jsonMapper = new ObjectMapper(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index fff4201d3636d..742e8474a23de 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -235,9 +235,9 @@ public void start() throws PulsarServerException { try { // Register the brokers in zk list ServiceConfiguration conf = pulsar.getConfiguration(); - if (pulsar.getZkClient().exists(LOADBALANCE_BROKERS_ROOT, false) == null) { + if (pulsar.getLocalZkClient().exists(LOADBALANCE_BROKERS_ROOT, false) == null) { try { - ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), LOADBALANCE_BROKERS_ROOT, new byte[0], + ZkUtils.createFullPathOptimistic(pulsar.getLocalZkClient(), LOADBALANCE_BROKERS_ROOT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException e) { // ignore the exception, node might be present already @@ -258,7 +258,7 @@ public void start() throws PulsarServerException { loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(loadReport); } try { - ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, + ZkUtils.createFullPathOptimistic(pulsar.getLocalZkClient(), brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e) { // Catching excption here to print the right error message @@ -287,7 +287,7 @@ public void start() throws PulsarServerException { @Override public void disableBroker() throws Exception { if (isNotEmpty(brokerZnodePath)) { - pulsar.getZkClient().delete(brokerZnodePath, -1); + pulsar.getLocalZkClient().delete(brokerZnodePath, -1); } } @@ -303,9 +303,9 @@ private void setDynamicConfigurationToZK(String zkPath, Map sett byte[] settingBytes = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(settings); try { if (pulsar.getLocalZkCache().exists(zkPath)) { - pulsar.getZkClient().setData(zkPath, settingBytes, -1); + pulsar.getLocalZkClient().setData(zkPath, settingBytes, -1); } else { - ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), zkPath, settingBytes, Ids.OPEN_ACL_UNSAFE, + ZkUtils.createFullPathOptimistic(pulsar.getLocalZkClient(), zkPath, settingBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { @@ -1195,7 +1195,7 @@ public void writeLoadReportOnZookeeper() throws Exception { if (needUpdate) { LoadReport lr = generateLoadReport(); - pulsar.getZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), + pulsar.getLocalZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), -1); this.lastLoadReport = lr; this.lastResourceUsageTimestamp = lr.getTimestamp(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java index 31c77090d6304..d216f81550bee 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java @@ -703,7 +703,7 @@ public List getListOfDestinations(String property, String cluster, Strin try { String path = String.format("/managed-ledgers/%s/%s/%s/persistent", property, cluster, namespace); LOG.debug("Getting children from managed-ledgers now: {}", path); - for (String destination : pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) { + for (String destination : pulsar.getDataZkCacheService().managedLedgerListCache().get(path)) { destinations.add(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, Codec.decode(destination))); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index c999790df3958..1ea533486f8f0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -191,7 +191,7 @@ public BrokerService(PulsarService pulsar) throws Exception { } public void start() throws Exception { - this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, + this.producerNameGenerator = new DistributedIdGenerator(pulsar.getLocalZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); ServerBootstrap bootstrap = new ServerBootstrap(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java index 781d028e12a7c..ca6c115db2657 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java @@ -102,7 +102,7 @@ public void setup() throws Exception { clusters = spy(new Clusters()); clusters.setPulsar(pulsar); - doReturn(mockZookKeeper).when(clusters).globalZk(); + doReturn(dataMockZookKeeper).when(clusters).globalZk(); doReturn(configurationCache.clustersCache()).when(clusters).clustersCache(); doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache(); doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache(); @@ -112,7 +112,7 @@ public void setup() throws Exception { properties = spy(new Properties()); properties.setServletContext(new MockServletContext()); properties.setPulsar(pulsar); - doReturn(mockZookKeeper).when(properties).globalZk(); + doReturn(dataMockZookKeeper).when(properties).globalZk(); doReturn(configurationCache.propertiesCache()).when(properties).propertiesCache(); doReturn("test").when(properties).clientAppId(); doNothing().when(properties).validateSuperUserAccess(); @@ -120,8 +120,8 @@ public void setup() throws Exception { namespaces = spy(new Namespaces()); namespaces.setServletContext(new MockServletContext()); namespaces.setPulsar(pulsar); - doReturn(mockZookKeeper).when(namespaces).globalZk(); - doReturn(mockZookKeeper).when(namespaces).localZk(); + doReturn(dataMockZookKeeper).when(namespaces).globalZk(); + doReturn(dataMockZookKeeper).when(namespaces).localZk(); doReturn(configurationCache.propertiesCache()).when(namespaces).propertiesCache(); doReturn(configurationCache.policiesCache()).when(namespaces).policiesCache(); doReturn("test").when(namespaces).clientAppId(); @@ -133,8 +133,8 @@ public void setup() throws Exception { brokers = spy(new Brokers()); brokers.setServletContext(new MockServletContext()); brokers.setPulsar(pulsar); - doReturn(mockZookKeeper).when(brokers).globalZk(); - doReturn(mockZookKeeper).when(brokers).localZk(); + doReturn(dataMockZookKeeper).when(brokers).globalZk(); + doReturn(dataMockZookKeeper).when(brokers).localZk(); doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache(); doReturn("test").when(brokers).clientAppId(); doNothing().when(brokers).validateSuperUserAccess(); @@ -146,8 +146,8 @@ public void setup() throws Exception { persistentTopics = spy(new PersistentTopics()); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); - doReturn(mockZookKeeper).when(persistentTopics).globalZk(); - doReturn(mockZookKeeper).when(persistentTopics).localZk(); + doReturn(dataMockZookKeeper).when(persistentTopics).globalZk(); + doReturn(dataMockZookKeeper).when(persistentTopics).localZk(); doReturn(configurationCache.propertiesCache()).when(persistentTopics).propertiesCache(); doReturn(configurationCache.policiesCache()).when(persistentTopics).policiesCache(); doReturn("test").when(persistentTopics).clientAppId(); @@ -159,16 +159,16 @@ public void setup() throws Exception { resourceQuotas = spy(new ResourceQuotas()); resourceQuotas.setServletContext(new MockServletContext()); resourceQuotas.setPulsar(pulsar); - doReturn(mockZookKeeper).when(resourceQuotas).globalZk(); - doReturn(mockZookKeeper).when(resourceQuotas).localZk(); + doReturn(dataMockZookKeeper).when(resourceQuotas).globalZk(); + doReturn(dataMockZookKeeper).when(resourceQuotas).localZk(); doReturn(configurationCache.propertiesCache()).when(resourceQuotas).propertiesCache(); doReturn(configurationCache.policiesCache()).when(resourceQuotas).policiesCache(); brokerStats = spy(new BrokerStats()); brokerStats.setServletContext(new MockServletContext()); brokerStats.setPulsar(pulsar); - doReturn(mockZookKeeper).when(brokerStats).globalZk(); - doReturn(mockZookKeeper).when(brokerStats).localZk(); + doReturn(dataMockZookKeeper).when(brokerStats).globalZk(); + doReturn(dataMockZookKeeper).when(brokerStats).localZk(); doReturn(configurationCache.propertiesCache()).when(brokerStats).propertiesCache(); doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache(); } @@ -271,7 +271,7 @@ void clusters() throws Exception { } // Test zk failures - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); configurationCache.clustersListCache().clear(); try { clusters.getClusters(); @@ -280,7 +280,7 @@ void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { clusters.createCluster("test", new ClusterData("http://broker.messaging.test.example.com")); fail("should have failed"); @@ -288,7 +288,7 @@ void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { clusters.updateCluster("test", new ClusterData("http://broker.messaging.test.example.com")); fail("should have failed"); @@ -296,7 +296,7 @@ void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { clusters.getCluster("test"); fail("should have failed"); @@ -304,7 +304,7 @@ void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failAfter(0, Code.SESSIONEXPIRED); + dataMockZookKeeper.failAfter(0, Code.SESSIONEXPIRED); try { clusters.deleteCluster("use"); fail("should have failed"); @@ -312,7 +312,7 @@ void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failAfter(1, Code.SESSIONEXPIRED); + dataMockZookKeeper.failAfter(1, Code.SESSIONEXPIRED); try { clusters.deleteCluster("use"); fail("should have failed"); @@ -388,7 +388,7 @@ void properties() throws Exception { } // Test zk failures - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { properties.getProperties(); fail("should have failed"); @@ -396,7 +396,7 @@ void properties() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { properties.getPropertyAdmin("my-property"); fail("should have failed"); @@ -404,7 +404,7 @@ void properties() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { properties.updateProperty("my-property", newPropertyAdmin); fail("should have failed"); @@ -412,7 +412,7 @@ void properties() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { properties.createProperty("test", propertyAdmin); fail("should have failed"); @@ -420,7 +420,7 @@ void properties() throws Exception { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { properties.deleteProperty("my-property"); fail("should have failed"); @@ -429,7 +429,7 @@ void properties() throws Exception { } properties.createProperty("error-property", propertyAdmin); - mockZookKeeper.failAfter(2, Code.SESSIONEXPIRED); + dataMockZookKeeper.failAfter(2, Code.SESSIONEXPIRED); try { properties.deleteProperty("error-property"); fail("should have failed"); @@ -525,7 +525,7 @@ void resourceQuotas() throws Exception { // create policies PropertyAdmin admin = new PropertyAdmin(); admin.getAllowedClusters().add(cluster); - mockZookKeeper.create(PulsarWebResource.path("policies", property), + dataMockZookKeeper.create(PulsarWebResource.path("policies", property), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -581,7 +581,7 @@ void persistentTopics() throws Exception { // create policies PropertyAdmin admin = new PropertyAdmin(); admin.getAllowedClusters().add(cluster); - ZkUtils.createFullPathOptimistic(mockZookKeeper, + ZkUtils.createFullPathOptimistic(dataMockZookKeeper, PulsarWebResource.path("policies", property, cluster, namespace), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/NamespacesTest.java index 54f50ffa9077a..a6ffdb0fc7df2 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/NamespacesTest.java @@ -119,8 +119,8 @@ public void setup() throws Exception { namespaces = spy(new Namespaces()); namespaces.setServletContext(new MockServletContext()); namespaces.setPulsar(pulsar); - doReturn(mockZookKeeper).when(namespaces).globalZk(); - doReturn(mockZookKeeper).when(namespaces).localZk(); + doReturn(dataMockZookKeeper).when(namespaces).globalZk(); + doReturn(dataMockZookKeeper).when(namespaces).localZk(); doReturn(pulsar.getConfigurationCache().propertiesCache()).when(namespaces).propertiesCache(); doReturn(pulsar.getConfigurationCache().policiesCache()).when(namespaces).policiesCache(); doReturn(false).when(namespaces).isRequestHttps(); @@ -186,7 +186,7 @@ public void testCreateNamespaces() throws Exception { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.createNamespace("my-property", "use", "my-namespace-3", new BundlesData()); fail("should have failed"); @@ -222,7 +222,7 @@ public void testGetNamespaces() throws Exception { } // ZK Errors - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.getPropertyNamespaces(this.testProperty); fail("should have failed"); @@ -230,7 +230,7 @@ public void testGetNamespaces() throws Exception { // Ok } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.getNamespacesForCluster(this.testProperty, this.testLocalCluster); fail("should have failed"); @@ -306,7 +306,7 @@ public void testGrantAndRevokePermissions() throws Exception { NamespaceName testNs = this.testLocalNamespaces.get(1); - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.getPolicies(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName()); fail("should have failed"); @@ -314,7 +314,7 @@ public void testGrantAndRevokePermissions() throws Exception { // Ok } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.getPermissions(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName()); fail("should have failed"); @@ -322,7 +322,7 @@ public void testGrantAndRevokePermissions() throws Exception { // Ok } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.grantPermissionOnNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), "other-role", EnumSet.of(AuthAction.consume)); @@ -331,7 +331,7 @@ public void testGrantAndRevokePermissions() throws Exception { // Ok } - mockZookKeeper.failNow(Code.BADVERSION); + dataMockZookKeeper.failNow(Code.BADVERSION); try { namespaces.grantPermissionOnNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), "other-role", EnumSet.of(AuthAction.consume)); @@ -340,7 +340,7 @@ public void testGrantAndRevokePermissions() throws Exception { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); } - mockZookKeeper.failNow(Code.BADVERSION); + dataMockZookKeeper.failNow(Code.BADVERSION); try { namespaces.revokePermissionsOnNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), "other-role"); @@ -349,7 +349,7 @@ public void testGrantAndRevokePermissions() throws Exception { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); try { namespaces.revokePermissionsOnNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), "other-role"); @@ -416,7 +416,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception { // Sometimes watcher event consumes scheduled exception, so set to always fail to ensure exception is // thrown for api call. - mockZookKeeper.setAlwaysFail(Code.SESSIONEXPIRED); + dataMockZookKeeper.setAlwaysFail(Code.SESSIONEXPIRED); pulsar.getConfigurationCache().policiesCache().invalidate(AdminResource.path("policies", this.testProperty, "global", this.testGlobalNamespaces.get(0).getLocalName())); try { @@ -426,10 +426,10 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception { } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } finally { - mockZookKeeper.unsetAlwaysFail(); + dataMockZookKeeper.unsetAlwaysFail(); } - mockZookKeeper.failNow(Code.BADVERSION); + dataMockZookKeeper.failNow(Code.BADVERSION); try { namespaces.setNamespaceReplicationClusters(this.testProperty, "global", this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use")); @@ -453,7 +453,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); pulsar.getConfigurationCache().policiesCache().clear(); // ensure the ZooKeeper read happens, bypassing the cache @@ -592,7 +592,7 @@ public void testDeleteNamespaces() throws Exception { NamespaceName testNs = this.testLocalNamespaces.get(1); DestinationName topicName = DestinationName.get(testNs.getPersistentTopicName("my-topic")); - ZkUtils.createFullPathOptimistic(mockZookKeeper, "/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), + ZkUtils.createFullPathOptimistic(dataMockZookKeeper, "/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), new byte[0], null, null); // setup ownership to localhost @@ -632,9 +632,9 @@ public void testDeleteNamespaces() throws Exception { } // delete the topic from ZK - mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1); + dataMockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1); // ensure refreshed destination list in the cache - pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree(); + pulsar.getDataZkCacheService().managedLedgerListCache().clearTree(); // setup ownership to localhost doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false, false); doReturn(true).when(nsSvc).isServiceUnitOwned(testNs); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/BrokerDataAndServiceZKTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/BrokerDataAndServiceZKTest.java new file mode 100644 index 0000000000000..aebe562fa6e13 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/BrokerDataAndServiceZKTest.java @@ -0,0 +1,272 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.auth; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.zookeeper.MockZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; +import static org.testng.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.pulsar.broker.PulsarService; +import com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService; +import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import com.yahoo.pulsar.broker.namespace.NamespaceService; +import com.yahoo.pulsar.client.admin.PulsarAdmin; +import com.yahoo.pulsar.client.api.Authentication; +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.Producer; +import com.yahoo.pulsar.client.api.ProducerConfiguration; +import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.SubscriptionType; +import com.yahoo.pulsar.common.policies.data.ClusterData; +import com.yahoo.pulsar.common.policies.data.PropertyAdmin; +import org.apache.zookeeper.KeeperException.NoNodeException; + +public class BrokerDataAndServiceZKTest { + private static final Logger log = LoggerFactory.getLogger(BrokerDataAndServiceZKTest.class); + + /** + * Start pulsar with same data-zk and cluster-management-zk + * 1. Both zk instance should be the same in pulsar + * 2. all the znodes should be craeted correctly + * + * @throws Exception + */ + @Test + public void testPulsarWithSingleDataAndLocalZk() throws Exception { + + PulsarServiceStarterTest pulsar = new PulsarServiceStarterTest(false); + // (1) test msg produce and consume + final String property = "my-property"; + final String namespace = "my-ns"; + testProduceConsume(pulsar.pulsarClient, property, namespace); + // (2) test zk-state + Assert.assertEquals(pulsar.pulsar.getLocalZooKeeperClientFactory(), pulsar.pulsar.getZooKeeperClientFactory()); + Assert.assertEquals(pulsar.pulsar.getLocalZkCache(), pulsar.pulsar.getDataZkCache()); + List brokers = pulsar.localMockZookKeeper.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT, + null); + List nsOwnership = pulsar.localMockZookKeeper.getChildren(LocalZooKeeperCacheService.OWNER_INFO_ROOT, + null); + List localPolicies = pulsar.localMockZookKeeper + .getChildren(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, null); + List managedLedger = pulsar.localMockZookKeeper + .getChildren(LocalZooKeeperCacheService.MANAGED_LEDGER_ROOT, null); + List counters = pulsar.localMockZookKeeper.getChildren("/counters", null); + List ledgers = pulsar.localMockZookKeeper.getChildren("/ledgers", null); + + // verify znodes are created properly + Assert.assertTrue(brokers.contains("localhost:" + pulsar.BROKER_WEBSERVICE_PORT)); + Assert.assertTrue(nsOwnership.contains(property)); + Assert.assertTrue(localPolicies.contains(property)); + Assert.assertTrue(managedLedger.contains(property)); + Assert.assertTrue(counters.contains("producer-name")); + Assert.assertFalse(ledgers.isEmpty()); + } + + /** + * Start pulsar with different data-zk and cluster-management-zk + * 1. Both zk instance should be the differemt in pulsar + * 2. all the znodes should be created correctly + * a. cluster-management-znode: /loadbalance, /namespace, /counter, /admin/local-policies + * b. data-znode: /ledgers, /managed-ledger + * + * @throws Exception + */ + @Test + public void testPulsarWithMultipleZkForDataAndLocal() throws Exception { + + PulsarServiceStarterTest pulsar = new PulsarServiceStarterTest(true); + // (1) test msg produce and consume + final String property = "my-property"; + final String namespace = "my-ns"; + testProduceConsume(pulsar.pulsarClient, property, namespace); + // (2) test zk-state + Assert.assertNotEquals(pulsar.pulsar.getLocalZooKeeperClientFactory(), + pulsar.pulsar.getZooKeeperClientFactory()); + Assert.assertNotEquals(pulsar.pulsar.getLocalZkCache(), pulsar.pulsar.getDataZkCache()); + List brokers = pulsar.localMockZookKeeper.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT, + null); + List nsOwnership = pulsar.localMockZookKeeper.getChildren(LocalZooKeeperCacheService.OWNER_INFO_ROOT, + null); + List localPolicies = pulsar.localMockZookKeeper + .getChildren(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, null); + List managedLedger = pulsar.localMockZookKeeper + .getChildren(LocalZooKeeperCacheService.MANAGED_LEDGER_ROOT, null); + List counters = pulsar.localMockZookKeeper.getChildren("/counters", null); + List ledgers; + try { + ledgers = pulsar.localMockZookKeeper.getChildren("/ledgers", null); + fail("dataZK should not have node present"); + } catch (NoNodeException ne) { + // ok + } + + // verify cluster-management-znodes are created properly + Assert.assertTrue(brokers.contains("localhost:" + pulsar.BROKER_WEBSERVICE_PORT)); + Assert.assertTrue(nsOwnership.contains(property)); + Assert.assertTrue(localPolicies.contains(property)); + Assert.assertTrue(counters.contains("producer-name")); + Assert.assertTrue(managedLedger.isEmpty()); + + // LocalZooKeeperCacheService.initZK() => Creates : OWNER_INFO_ROOT,LOCAL_POLICIES_ROOT,MANAGED_LEDGER_ROOT (so, + // znode will be created in both zk but it will be empty) + brokers.clear(); + counters.clear(); + try { + brokers = pulsar.dataMockZookKeeper.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT, null); + counters = pulsar.dataMockZookKeeper.getChildren("/counters", null); + fail("dataZK should not have node present"); + } catch (NoNodeException ne) { + // ok + } + nsOwnership = pulsar.dataMockZookKeeper.getChildren(LocalZooKeeperCacheService.OWNER_INFO_ROOT, null); + localPolicies = pulsar.dataMockZookKeeper.getChildren(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, null); + managedLedger = pulsar.dataMockZookKeeper.getChildren(LocalZooKeeperCacheService.MANAGED_LEDGER_ROOT, null); + ledgers = pulsar.dataMockZookKeeper.getChildren("/ledgers", null); + + // verify data-znodes are created properly + Assert.assertTrue(brokers.isEmpty()); + Assert.assertTrue(counters.isEmpty()); + Assert.assertTrue(nsOwnership.isEmpty()); + Assert.assertTrue(localPolicies.isEmpty()); + Assert.assertTrue(managedLedger.contains(property)); + Assert.assertFalse(ledgers.isEmpty()); + + } + + static class PulsarServiceStarterTest extends MockedPulsarServiceBaseTest { + + private boolean diffDataAndLocalZk = false; + + public PulsarServiceStarterTest(boolean diffDataAndLocalZk) throws Exception { + this.diffDataAndLocalZk = diffDataAndLocalZk; + setup(); + } + + @Override + protected void setup() throws Exception { + // internal-setup + initPulsar(); + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + String lookupUrl = brokerUrl.toString(); + if (isTcpLookup) { + lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + } + pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + // create test cluster, property, namespace + admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT)); + admin.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + } + + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected final void initPulsar() throws Exception { + dataMockZookKeeper = createMockZooKeeper(); + if (diffDataAndLocalZk) { + localMockZookKeeper = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor()); + } else { + localMockZookKeeper = dataMockZookKeeper; + } + mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), dataMockZookKeeper); + sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); + startBroker(); + brokerUrl = new URL("http://" + pulsar.getAdvertisedAddress() + ":" + BROKER_WEBSERVICE_PORT); + brokerUrlTls = new URL("https://" + pulsar.getAdvertisedAddress() + ":" + BROKER_WEBSERVICE_PORT_TLS); + admin = spy(new PulsarAdmin(brokerUrl, (Authentication) null)); + } + + protected void setupBrokerMocks(PulsarService pulsar) throws Exception { + // Override default providers with mocked ones + doReturn(!diffDataAndLocalZk).when(pulsar).equalsDataAndLocalZk(); + doReturn(localMockZooKeeperClientFactory).when(pulsar).getLocalZooKeeperClientFactory(); + doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory(); + Supplier namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); + doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); + doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor(); + // use diff zk instance for split-zk usecase + if (diffDataAndLocalZk) { + doReturn(dataMockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); + } else { + doReturn(localMockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); + } + } + + } + + private void testProduceConsume(PulsarClient pulsarClient, String property, String namespace) throws Exception { + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Exclusive); + final String topic = String.format("persistent://%s/use/%s/my-topic1", property, namespace); + Consumer consumer = pulsarClient.subscribe(topic, "my-subscriber-name", conf); + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer(topic, producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + } + + private void testMessageOrderAndDuplicates(Set messagesReceived, String receivedMessage, + String expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + +} diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 87fa2621f9a7c..734d3fac1e68a 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -66,11 +66,12 @@ public abstract class MockedPulsarServiceBaseTest { protected final int BROKER_PORT = PortManager.nextFreePort(); protected final int BROKER_PORT_TLS = PortManager.nextFreePort(); - protected MockZooKeeper mockZookKeeper; + protected MockZooKeeper dataMockZookKeeper; + protected MockZooKeeper localMockZookKeeper; protected NonClosableMockBookKeeper mockBookKeeper; protected boolean isTcpLookup = false; - private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; + protected SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; public MockedPulsarServiceBaseTest() { this.conf = new ServiceConfiguration(); @@ -105,8 +106,9 @@ protected final void internalSetupForStatsTest() throws Exception { } protected final void init() throws Exception { - mockZookKeeper = createMockZooKeeper(); - mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper); + localMockZookKeeper = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor()); + dataMockZookKeeper = createMockZooKeeper(); + mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), dataMockZookKeeper); sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); @@ -123,7 +125,8 @@ protected final void internalCleanup() throws Exception { pulsarClient.close(); pulsar.close(); mockBookKeeper.reallyShutdow(); - mockZookKeeper.shutdown(); + dataMockZookKeeper.shutdown(); + localMockZookKeeper.shutdown(); sameThreadOrderedSafeExecutor.shutdown(); } @@ -156,7 +159,9 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception protected void setupBrokerMocks(PulsarService pulsar) throws Exception { // Override default providers with mocked ones - doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); + doReturn(false).when(pulsar).equalsDataAndLocalZk(); + doReturn(localMockZooKeeperClientFactory).when(pulsar).getLocalZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory(); Supplier namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); @@ -165,7 +170,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor(); } - private MockZooKeeper createMockZooKeeper() throws Exception { + protected MockZooKeeper createMockZooKeeper() throws Exception { MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor()); List dummyAclList = new ArrayList(0); @@ -178,7 +183,7 @@ private MockZooKeeper createMockZooKeeper() throws Exception { } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test - private static class NonClosableMockBookKeeper extends MockBookKeeper { + protected static class NonClosableMockBookKeeper extends MockBookKeeper { public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper zk) throws Exception { super(conf, zk); @@ -199,17 +204,27 @@ public void reallyShutdow() { } } - protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() { + protected ZooKeeperClientFactory dataMockZooKeeperClientFactory = new ZooKeeperClientFactory() { @Override public CompletableFuture create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) { // Always return the same instance (so that we don't loose the mock ZK content on broker restart - return CompletableFuture.completedFuture(mockZookKeeper); + return CompletableFuture.completedFuture(dataMockZookKeeper); } }; - private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { + protected ZooKeeperClientFactory localMockZooKeeperClientFactory = new ZooKeeperClientFactory() { + + @Override + public CompletableFuture create(String serverList, SessionType sessionType, + int zkSessionTimeoutMillis) { + // Always return the same instance (so that we don't loose the mock ZK content on broker restart + return CompletableFuture.completedFuture(localMockZookKeeper); + } + }; + + protected BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index d8576be9d78bc..19584766e7643 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -87,7 +87,7 @@ public void setup() throws Exception { doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); ZooKeeper mockZk = mock(ZooKeeper.class); - doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(mockZk).when(pulsar).getDataZkClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java index 32497ceb2a111..3d059116b6722 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -85,7 +85,7 @@ public void setup(Method m) throws Exception { doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); ZooKeeper mockZk = mock(ZooKeeper.class); - doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(mockZk).when(pulsar).getDataZkClient(); brokerService = spy(new BrokerService(pulsar)); doReturn(brokerService).when(pulsar).getBrokerService(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java index d3e7517af32de..81d6de1896e99 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java @@ -116,7 +116,7 @@ public void setup() throws Exception { doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); ZooKeeper mockZk = mock(ZooKeeper.class); - doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(mockZk).when(pulsar).getDataZkClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 345b3ca9c4bd8..44f2376fde0a5 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -135,7 +135,7 @@ public void setup() throws Exception { doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); ZooKeeper mockZk = mock(ZooKeeper.class); - doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(mockZk).when(pulsar).getDataZkClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/web/WebServiceTest.java index 948fa14734ca8..9012985e6bca5 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/web/WebServiceTest.java @@ -292,13 +292,14 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU config.setAdvertisedAddress("localhost"); // TLS certificate expects localhost pulsar = spy(new PulsarService(config)); doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory(); + doReturn(true).when(pulsar).equalsDataAndLocalZk(); pulsar.start(); try { - pulsar.getZkClient().delete("/minApiVersion", -1); + pulsar.getLocalZkClient().delete("/minApiVersion", -1); } catch (Exception ex) { } - pulsar.getZkClient().create("/minApiVersion", minApiVersion.getBytes(), null, CreateMode.PERSISTENT); + pulsar.getDataZkClient().create("/minApiVersion", minApiVersion.getBytes(), null, CreateMode.PERSISTENT); String serviceUrl = BROKER_URL_BASE; ClientConfiguration clientConfig = new ClientConfiguration(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java index 98abfe4a2131c..ca7ab554a3cce 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java @@ -51,7 +51,7 @@ public void testSessionExpired() throws Exception { assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster")); - mockZookKeeper.failNow(Code.SESSIONEXPIRED); + dataMockZookKeeper.failNow(Code.SESSIONEXPIRED); assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster")); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java index 4a22a6470cf8b..d2c315b36c66d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/BrokerServiceLookupTest.java @@ -30,6 +30,7 @@ import java.security.PrivateKey; import java.security.SecureRandom; import java.security.cert.Certificate; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -43,6 +44,9 @@ import javax.net.ssl.TrustManager; import org.apache.bookkeeper.test.PortManager; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -68,6 +72,7 @@ import com.yahoo.pulsar.common.util.SecurityUtility; import com.yahoo.pulsar.discovery.service.DiscoveryService; import com.yahoo.pulsar.discovery.service.server.ServiceConfig; +import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -447,7 +452,7 @@ public void testDiscoveryLookup() throws Exception { config.setServicePort(nextFreePort()); config.setBindOnLocalhost(true); DiscoveryService discoveryService = spy(new DiscoveryService(config)); - doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); + doReturn(localMockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); // (2) lookup using discovery service @@ -510,7 +515,7 @@ public void testDiscoveryLookupTls() throws Exception { config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); DiscoveryService discoveryService = spy(new DiscoveryService(config)); - doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); + doReturn(localMockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); // (3) lookup using discovery service @@ -567,7 +572,19 @@ public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception { config.setAuthenticationEnabled(true); config.setAuthorizationEnabled(true); DiscoveryService discoveryService = spy(new DiscoveryService(config)); - doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); + + // as discoveryService uses same ZookeeperClientFactory for global and local ZK + // combine both zk as one for testing : and then create cluster/property on localZK + dataMockZooKeeperClientFactory = localMockZooKeeperClientFactory; + ZkUtils.createFullPathOptimistic(localMockZookKeeper, "/admin/clusters", + "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), new ArrayList(0), CreateMode.PERSISTENT); + ZkUtils.createFullPathOptimistic(localMockZookKeeper, "/admin/policies", + "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), new ArrayList(0), CreateMode.PERSISTENT); + doReturn(pulsar.getLocalZkCache()).when(pulsar).getGlobalZkCache(); + admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT)); + admin.properties().createProperty("my-property1", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use2"))); + + doReturn(localMockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); // (2) lookup using discovery service @@ -596,9 +613,9 @@ public void start() throws PulsarClientException { }); PulsarClient pulsarClient = PulsarClient.create(discoverySvcUrl, clientConfig); - Consumer consumer = pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", + Consumer consumer = pulsarClient.subscribe("persistent://my-property1/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); - Producer producer = pulsarClient.createProducer("persistent://my-property/use2/my-ns/my-topic1", + Producer producer = pulsarClient.createProducer("persistent://my-property1/use2/my-ns/my-topic1", new ProducerConfiguration()); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; @@ -633,7 +650,7 @@ public void testDiscoveryLookupAuthenticationFailure() throws Exception { config.setAuthenticationEnabled(true); config.setAuthorizationEnabled(true); DiscoveryService discoveryService = spy(new DiscoveryService(config)); - doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); + doReturn(localMockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); // (2) lookup using discovery service final String discoverySvcUrl = discoveryService.getServiceUrl(); @@ -683,7 +700,7 @@ public void testDiscoveryLookupAuthorizationFailure() throws Exception { config.setAuthenticationEnabled(true); config.setAuthorizationEnabled(true); DiscoveryService discoveryService = spy(new DiscoveryService(config)); - doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); + doReturn(localMockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); // (2) lookup using discovery service final String discoverySvcUrl = discoveryService.getServiceUrl(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/discovery/service/web/DiscoveryServiceWebTest.java index 2fe7c7a15298f..b2da967b0bb59 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/discovery/service/web/DiscoveryServiceWebTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -80,7 +80,7 @@ public void testRiderectUrlWithServerStarted() throws Exception { ServiceConfig config = new ServiceConfig(); config.setWebServicePort(port); ServerManager server = new ServerManager(config); - DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper; + DiscoveryZooKeeperClientFactoryImpl.zk = localMockZookKeeper; Map params = new TreeMap<>(); params.put("zookeeperServers", ""); params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 18c5afcbb0340..8e5464f50583e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -64,7 +64,7 @@ public void setup() throws Exception { config.setAuthenticationProviders( Sets.newHashSet("com.yahoo.pulsar.websocket.proxy.MockAuthenticationProvider")); service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 14057c88e9bcc..017d385fc4763 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -61,7 +61,7 @@ protected void setup() throws Exception { config.setClusterName("c1"); config.setWebServicePort(TEST_PORT); service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); service.start(); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 1a5d74f887b24..7027dcdc6dd83 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -56,7 +56,7 @@ public void setup() throws Exception { config.setClusterName("use"); config.setGlobalZookeeperServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java index caf40ac894e59..da6071fd20ad8 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java @@ -73,7 +73,7 @@ public void setup() throws Exception { config.setClusterName("use"); config.setGlobalZookeeperServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 8f7e6f0c8ebf7..4284647937efb 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -57,7 +57,7 @@ public void setup() throws Exception { config.setServiceUrl(pulsar.getWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + doReturn(dataMockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started");