Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ PULSAR_HOME=`cd $BINDIR/..;pwd`
DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_DATA_ZK_CONF=$PULSAR_HOME/conf/data_zookeeper.conf
DEFAULT_GLOBAL_ZK_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
Expand Down Expand Up @@ -83,6 +84,7 @@ Environment variables:
PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF)
PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
PULSAR_DATA_ZK_CONF Configuration file for data zookeeper (default: $DEFAULT_DATA_ZK_CONF)
PULSAR_GLOBAL_ZK_CONF Configuration file for global zookeeper (default: $DEFAULT_GLOBAL_ZK_CONF)
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
Expand Down Expand Up @@ -141,6 +143,10 @@ if [ -z "$PULSAR_ZK_CONF" ]; then
PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi

if [ -z "$PULSAR_DATA_ZK_CONF" ]; then
PULSAR_DATA_ZK_CONF=$DEFAULT_DATA_ZK_CONF
fi

if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi
Expand Down Expand Up @@ -193,6 +199,9 @@ elif [ $COMMAND == "bookie" ]; then
elif [ $COMMAND == "zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_ZK_CONF $@
elif [ $COMMAND == "data-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"data-zookeeper.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_DATA_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# Zookeeper quorum connection string
zookeeperServers=

# Data Zookeeper quorum connection string
dataZookeeperServers=

# Global Zookeeper quorum connection string
globalZookeeperServers=

Expand Down Expand Up @@ -49,6 +52,9 @@ clusterName=
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

# Data Zookeeper session timeout in milliseconds
dataZooKeeperSessionTimeoutMillis=60000

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=3000

Expand Down
42 changes: 42 additions & 0 deletions conf/data_zookeeper.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright 2016 Yahoo Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# Zookeeper quorum connection string
zookeeperServers=

# Data Zookeeper quorum connection string
dataZookeeperServers=

# Global Zookeeper quorum connection string
globalZookeeperServers=

Expand All @@ -39,6 +42,9 @@ clusterName=standalone
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

# Data Zookeeper session timeout in milliseconds
dataZooKeeperSessionTimeoutMillis=60000

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=3000

Expand Down
1 change: 1 addition & 0 deletions docs/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Internally, a single managed ledger uses multiple Bookkeeper ledgers to store th
Pulsar uses Apache Zookeeper for metadata, cluster configuration and coordination.
- *Global Zookeeper* stores user provisioning data like properties, namespaces and policies which should be global consistent.
- Each cluster has a *local zookeeper* ensemble which stores cluster specific configuration and coordination data, like ownership metadata, broker load reports, bookkeeper ledgers' metadata.
- *local zookeeper* can be highly available by configuring separete *data zookeeper* which stores bookkeeper ledgers' metadata and *local zookeeper* needs to store only cluster management configuration.


## Design
Expand Down
27 changes: 27 additions & 0 deletions docs/ClusterSetup.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ Start ZK service on all the hosts:
```shell
$ bin/pulsar-daemon start zookeeper
```
#### Data ZooKeeper

Start dedicated zookeeper quorum to store bookkeeper ledgers' metadata.

##### Single zookeeper quorum
This does not require to configure and start _data_ zookeeper quorum, and pulsar instance uses _local_ zk quorum to store both cluster-management configuration and bookkeeper ledgers' metadata.

##### Dedicated data-zookeeper quorum
In this scenario dedicated _data_ zk stores bookkeeper ledgers' metadata and _local_ zk stores only cluster-management configuration. Using dedicated _data_ zk makes _local_ zk and broker highly available because broker's availability only depends on cluster-management configuration that is stored into _local_ zk.

Add all ZK servers the quorum configuration. Edit `conf/data_zookeeper.conf` and add
the following lines in all the ZK servers:

```
server.1=zk4.us-west.example.com:2888:3888
server.2=zk5.us-west.example.com:2888:3888
server.3=zk6.us-west.example.com:2888:3888
...
```

Start ZK service on all the hosts:

```shell
$ bin/pulsar-daemon start data-zookeeper
```


#### Global ZooKeeper

Expand Down Expand Up @@ -135,6 +161,7 @@ as well as the Pulsar metadata.
```shell
$ bin/pulsar initialize-cluster-metadata --cluster us-west \
--zookeeper zk1.us-west.example.com:2181 \
--data-zookeeper zk1.us-west.example.com:2181 \
--global-zookeeper zk1.us-west.example.com:2184 \
--service-url http://pulsar.us-west.example.com:8080/ \
--service-url-tls https://pulsar.us-west.example.com:8443/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class ServiceConfiguration implements PulsarConfiguration{
// Zookeeper quorum connection string
@FieldContext(required = true)
private String zookeeperServers;
// Data Zookeeper quorum connection string
@FieldContext(required = false)
private String dataZookeeperServers;
// Global Zookeeper quorum connection string
@FieldContext(required = false)
private String globalZookeeperServers;
Expand All @@ -60,6 +63,8 @@ public class ServiceConfiguration implements PulsarConfiguration{
private String clusterName;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// Data Zookeeper session timeout in milliseconds
private long dataZooKeeperSessionTimeoutMillis = 60000;
// Time to wait for broker graceful shutdown. After this time elapses, the
// process will be killed
private long brokerShutdownTimeoutMs = 3000;
Expand Down Expand Up @@ -249,6 +254,19 @@ public void setZookeeperServers(String zookeeperServers) {
this.zookeeperServers = zookeeperServers;
}

public String getDataZookeeperServers() {
if (this.dataZookeeperServers == null || this.dataZookeeperServers.isEmpty()) {
// If the configuration is not set, assuming that the dataZK is not enabled and all data is in the same
// ZooKeeper cluster
return this.getZookeeperServers();
}
return dataZookeeperServers;
}

public void setDataZookeeperServers(String dataZookeeperServers) {
this.dataZookeeperServers = dataZookeeperServers;
}

public String getGlobalZookeeperServers() {
if (this.globalZookeeperServers == null || this.globalZookeeperServers.isEmpty()) {
// If the configuration is not set, assuming that the globalZK is not enabled and all data is in the same
Expand Down Expand Up @@ -906,6 +924,14 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis)
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}

public long getDataZooKeeperSessionTimeoutMillis() {
return dataZooKeeperSessionTimeoutMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we return zooKeeperSessionTimeout if dataZookeeperServers is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we are initializing zooKeeperSessionTimeoutMillis=30000 and dataZooKeeperSessionTimeoutMillis=60000 with default value.

}

public void setDataZooKeeperSessionTimeoutMillis(long dataZooKeeperSessionTimeoutMillis) {
this.dataZooKeeperSessionTimeoutMillis = dataZooKeeperSessionTimeoutMillis;
}

public String getReplicatorPrefix() {
return replicatorPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.util.ZkUtils;
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -54,6 +55,10 @@ private static class Arguments {
@Parameter(names = { "-zk",
"--zookeeper" }, description = "Local ZooKeeper quorum connection string", required = true)
private String zookeeper;

@Parameter(names = { "-dzk",
"--data-zookeeper" }, description = "Data ZooKeeper quorum connection string (Optional if data-zookeeper is not configured", required = false)
private String dataZookeeper;

@Parameter(names = { "-gzk",
"--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = true)
Expand All @@ -78,22 +83,26 @@ public static void main(String[] args) throws Exception {
return;
}

log.info("Setting up cluster {} with zk={} global-zk={}", arguments.cluster, arguments.zookeeper,
arguments.globalZookeeper);
arguments.dataZookeeper = isBlank(arguments.dataZookeeper) ? arguments.zookeeper : arguments.dataZookeeper;
log.info("Setting up cluster {} with zk={} dzk={} global-zk={}", arguments.cluster, arguments.zookeeper,
arguments.dataZookeeper, arguments.globalZookeeper);

// Format BookKeeper metadata
ClientConfiguration bkConf = new ClientConfiguration();
bkConf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
bkConf.setZkServers(arguments.zookeeper);
bkConf.setZkServers(arguments.dataZookeeper);
if (!BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
throw new IOException("Failed to initialize BookKeeper metadata");
}

ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
ZooKeeper localZk = zkfactory.create(arguments.zookeeper, SessionType.ReadWrite, 30000).get();
// use localZk for dataZookeeper if dataZk is not provided or it is same as localZk
ZooKeeper dataZk = arguments.zookeeper.equals(arguments.dataZookeeper) ? localZk
: zkfactory.create(arguments.dataZookeeper, SessionType.ReadWrite, 30000).get();
ZooKeeper globalZk = zkfactory.create(arguments.globalZookeeper, SessionType.ReadWrite, 30000).get();

localZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
dataZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

ZkUtils.createFullPathOptimistic(globalZk, "/admin/policies", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ public void shutdown(int exitCode) {
try {
// Try to close ZK session to ensure all ephemeral locks gets released immediately
if (service != null) {
if (service.getZkClient().getState() != States.CLOSED) {
service.getZkClient().close();
if (service.getLocalZkClient().getState() != States.CLOSED) {
service.getLocalZkClient().close();
}
if (service.getDataZkClient().getState() != States.CLOSED) {
service.getDataZkClient().close();
}
}
} catch (Exception e) {
Expand Down
Loading