diff --git a/all/pom.xml b/all/pom.xml
index e07f80492d000..c9178dd42b36f 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -69,6 +69,12 @@
netty-common
+
+
+ ${project.groupId}
+ pulsar-zookeeper
+ ${project.version}
+
diff --git a/all/src/assemble/bin.xml b/all/src/assemble/bin.xml
index a318cf24e34cb..d9e1e3b9fbd32 100644
--- a/all/src/assemble/bin.xml
+++ b/all/src/assemble/bin.xml
@@ -72,6 +72,9 @@
io.netty:netty-handler
io.netty:netty-transport-native-epoll
io.netty:netty-codec-http
+
+
+ org.apache.zookeeper:zookeeper
diff --git a/bin/pulsar b/bin/pulsar
index 30238f8317f2f..1c117cc7baafd 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -192,12 +192,19 @@ elif [ $COMMAND == "bookie" ]; then
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [ $COMMAND == "zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
- exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_ZK_CONF $@
+
+ # Add instrumentation
+ WEAVER_JAR=`find $PULSAR_HOME/lib -name 'aspectjweaver-*.jar'`
+ if [ -n "$WEAVER_JAR" ]; then OPTS="$OPTS -javaagent:$WEAVER_JAR"; fi
+ exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE com.yahoo.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} -Dreadonlymode.enabled=true"
- exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_GLOBAL_ZK_CONF $@
+ # Add instrumentation
+ WEAVER_JAR=`find $PULSAR_HOME/lib -name 'aspectjweaver-*.jar'`
+ if [ -n "$WEAVER_JAR" ]; then OPTS="$OPTS -javaagent:$WEAVER_JAR"; fi
+ exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE com.yahoo.pulsar.zookeeper.ZooKeeperStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "discovery" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE com.yahoo.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 03c14c0814009..920784d9f9278 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -23,6 +23,8 @@ pulsar.log.dir=logs
pulsar.log.file=pulsar.log
log4j.rootLogger=${pulsar.root.logger}
+log4j.logger.org.apache.zookeeper.server.ZooKeeperServer=WARN
+log4j.logger.org.apache.zookeeper.ZooKeeper=WARN
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
diff --git a/pom.xml b/pom.xml
index 103ba3c9c0e77..79bf54582b9e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ flexible messaging model and an intuitive client API.
pulsar-testclient
pulsar-broker-auth-athenz
pulsar-client-auth-athenz
+ pulsar-zookeeper
all
@@ -103,6 +104,7 @@ flexible messaging model and an intuitive client API.
9.3.11.v20160721
1.1.8
0.0.21
+ 1.8.9
@@ -423,6 +425,12 @@ flexible messaging model and an intuitive client API.
${prometheus.version}
+
+ io.prometheus
+ simpleclient_servlet
+ ${prometheus.version}
+
+
org.apache.spark
spark-streaming_2.10
@@ -451,6 +459,18 @@ flexible messaging model and an intuitive client API.
+
+ org.aspectj
+ aspectjrt
+ ${aspectj.version}
+
+
+
+ org.aspectj
+ aspectjweaver
+ ${aspectj.version}
+
+
diff --git a/pulsar-zookeeper/pom.xml b/pulsar-zookeeper/pom.xml
new file mode 100644
index 0000000000000..59194cb23cc02
--- /dev/null
+++ b/pulsar-zookeeper/pom.xml
@@ -0,0 +1,129 @@
+
+
+ 4.0.0
+
+
+ com.yahoo.pulsar
+ pulsar
+ 1.18-SNAPSHOT
+ ..
+
+
+ pulsar-zookeeper
+ jar
+ pulsar-zookeeper
+ Instrumented ZooKeeper with detailed stats
+
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+ org.aspectj
+ aspectjrt
+
+
+
+ org.aspectj
+ aspectjweaver
+
+
+
+ io.prometheus
+ simpleclient
+
+
+
+ io.prometheus
+ simpleclient_hotspot
+
+
+
+ io.prometheus
+ simpleclient_servlet
+
+
+
+ org.eclipse.jetty
+ jetty-servlet
+
+
+
+
+
+
+ org.codehaus.mojo
+ aspectj-maven-plugin
+ 1.10
+
+ 1.8
+ 1.8
+ 1.8
+ true
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ process-sources
+
+ compile
+
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.codehaus.mojo
+ aspectj-maven-plugin
+ [1.10,)
+
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/FinalRequestProcessorAspect.java b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/FinalRequestProcessorAspect.java
new file mode 100644
index 0000000000000..506c056d250ca
--- /dev/null
+++ b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/FinalRequestProcessorAspect.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.zookeeper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
+
+@Aspect
+public class FinalRequestProcessorAspect {
+
+ private static final Map requestTypeMap = new HashMap<>();
+
+ static {
+ // Prefill the map
+ requestTypeMap.put(OpCode.notification, "notification");
+ requestTypeMap.put(OpCode.create, "create");
+ requestTypeMap.put(OpCode.delete, "delete");
+ requestTypeMap.put(OpCode.exists, "exists");
+ requestTypeMap.put(OpCode.getData, "getData");
+ requestTypeMap.put(OpCode.setData, "setData");
+ requestTypeMap.put(OpCode.getACL, "getACL");
+ requestTypeMap.put(OpCode.setACL, "setACL");
+ requestTypeMap.put(OpCode.getChildren, "getChildren");
+ requestTypeMap.put(OpCode.sync, "sync");
+ requestTypeMap.put(OpCode.ping, "ping");
+ requestTypeMap.put(OpCode.getChildren2, "getChildren2");
+ requestTypeMap.put(OpCode.check, "check");
+ requestTypeMap.put(OpCode.multi, "multi");
+ requestTypeMap.put(OpCode.auth, "auth");
+ requestTypeMap.put(OpCode.setWatches, "setWatches");
+ requestTypeMap.put(OpCode.sasl, "sasl");
+ requestTypeMap.put(OpCode.createSession, "createSession");
+ requestTypeMap.put(OpCode.closeSession, "closeSession");
+ requestTypeMap.put(OpCode.error, "error");
+ }
+
+ private static final Counter requests = Counter
+ .build("zookeeper_server_requests", "Requests issued to a particular server").labelNames("type").create()
+ .register();
+
+ private static final Summary requestsLatency = Summary.build().name("zookeeper_server_requests_latency_ms")
+ .help("Requests latency in millis") //
+ .quantile(0.50, 0.01) //
+ .quantile(0.75, 0.01) //
+ .quantile(0.95, 0.01) //
+ .quantile(0.99, 0.01) //
+ .quantile(0.999, 0.01) //
+ .quantile(0.9999, 0.01) //
+ .quantile(1.0, 0.01) //
+ .maxAgeSeconds(60) //
+ .labelNames("type") //
+ .create().register();
+
+ @Pointcut("execution(void org.apache.zookeeper.server.FinalRequestProcessor.processRequest(..))")
+ public void processRequest() {
+ }
+
+ @Around("processRequest()")
+ public void timedProcessRequest(ProceedingJoinPoint joinPoint) throws Throwable {
+ joinPoint.proceed();
+
+ Request request = (Request) joinPoint.getArgs()[0];
+
+ String type = requestTypeMap.getOrDefault(request.type, "unknown");
+ requests.labels(type).inc();
+
+ long latencyMs = System.currentTimeMillis() - request.createTime;
+ String latencyLabel = isWriteRequest(request.type) ? "write" : "read";
+ requestsLatency.labels(latencyLabel).observe(latencyMs);
+ }
+
+ private static boolean isWriteRequest(int opCode) {
+ switch (opCode) {
+ case OpCode.create:
+ case OpCode.delete:
+ case OpCode.setData:
+ case OpCode.setACL:
+ case OpCode.sync:
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ return true;
+
+ case OpCode.notification:
+ case OpCode.exists:
+ case OpCode.getData:
+ case OpCode.getACL:
+ case OpCode.getChildren:
+ case OpCode.ping:
+ case OpCode.getChildren2:
+ case OpCode.check:
+ case OpCode.multi:
+ case OpCode.auth:
+ case OpCode.setWatches:
+ case OpCode.sasl:
+ case OpCode.error:
+ default:
+ return false;
+ }
+ }
+}
diff --git a/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperServerAspect.java b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperServerAspect.java
new file mode 100644
index 0000000000000..e4098ee0998ea
--- /dev/null
+++ b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperServerAspect.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.zookeeper;
+
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+
+import io.prometheus.client.Gauge;
+
+/**
+ * Instruments ZooKeeperServer to enable stats reporting on data set and z-node sizess
+ */
+@Aspect
+public class ZooKeeperServerAspect {
+ @Pointcut("execution(org.apache.zookeeper.server.ZooKeeperServer.new(..))")
+ public void processRequest() {
+ }
+
+ @After("processRequest()")
+ public void timedProcessRequest(JoinPoint joinPoint) throws Throwable {
+ // ZooKeeperServer instance was created
+ ZooKeeperServer zkServer = (ZooKeeperServer) joinPoint.getThis();
+
+ Gauge.build().name("zookeeper_server_znode_count").help("Number of z-nodes stored").create()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return zkServer.getZKDatabase().getNodeCount();
+ }
+ }).register();
+
+ Gauge.build().name("zookeeper_server_data_size_bytes").help("Size of all of z-nodes stored (bytes)").create()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return zkServer.getZKDatabase().getDataTree().approximateDataSize();
+ }
+ }).register();
+
+ Gauge.build().name("zookeeper_server_connections").help("Number of currently opened connections").create()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return zkServer.serverStats().getNumAliveClientConnections();
+ }
+ }).register();
+
+ Gauge.build().name("zookeeper_server_watches_count").help("Number of watches").create()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return zkServer.getZKDatabase().getDataTree().getWatchCount();
+ }
+ }).register();
+
+ Gauge.build().name("zookeeper_server_ephemerals_count").help("Number of ephemerals z-nodes").create()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return zkServer.getZKDatabase().getDataTree().getEphemeralsCount();
+ }
+ }).register();
+ }
+
+}
diff --git a/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperStarter.java b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperStarter.java
new file mode 100644
index 0000000000000..f69bfc2e6ec93
--- /dev/null
+++ b/pulsar-zookeeper/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperStarter.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.zookeeper;
+
+import java.net.InetSocketAddress;
+
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.hotspot.DefaultExports;
+
+public class ZooKeeperStarter {
+ public static void main(String[] args) throws Exception {
+ // Register basic JVM metrics
+ DefaultExports.initialize();
+
+ // Start Jetty to serve stats
+ int port = Integer.parseInt(System.getProperties().getProperty("stats_server_port", "8080"));
+
+ log.info("Starting ZK stats HTTP server at port {}", port);
+ InetSocketAddress httpEndpoint = InetSocketAddress.createUnresolved("0.0.0.0", port);
+
+ Server server = new Server(httpEndpoint);
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ server.setHandler(context);
+ context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+ try {
+ server.start();
+ } catch (Exception e) {
+ log.error("Failed to start HTTP server at port {}. Use \"-Dstats_server_port=1234\" to change port number",
+ port, e);
+ throw e;
+ }
+
+ // Start the regular ZooKeeper server
+ QuorumPeerMain.main(args);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ZooKeeperStarter.class);
+}
diff --git a/pulsar-zookeeper/src/main/resources/META-INF/aop.xml b/pulsar-zookeeper/src/main/resources/META-INF/aop.xml
new file mode 100644
index 0000000000000..2e0a4909b5fa9
--- /dev/null
+++ b/pulsar-zookeeper/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file