diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index e983198fb7ca9..f5dd09011d6ec 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -57,7 +57,12 @@
org.slf4j
slf4j-api
-
+
+
+ org.hdrhistogram
+ HdrHistogram
+
+
org.mockito
mockito-core
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 600d3777f5aaf..e3ad5073da7a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -222,7 +222,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
+ final long now = System.nanoTime();
bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
+ ledger.getStore().recordReadLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
@@ -1825,6 +1827,7 @@ void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
+ ledger.getStore().recordWriteCount(3L); // create-ledger-performs-3-write
ledger.getExecutor().submit(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a98604ff638e9..35fe2a649a172 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -248,6 +248,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}] Opening legder {}", name, id);
}
+ store.recordReadCount(1L);
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null);
} else {
@@ -276,7 +277,9 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else {
iterator.remove();
+ final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
+ store.recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc);
}
@@ -302,6 +305,7 @@ public void operationFailed(MetaStoreException e) {
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
+ store.recordWriteCount(3L);
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
@@ -478,6 +482,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
+ store.recordWriteCount(3); // create-ledger performs 3 writes on zk
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx);
}
@@ -1076,8 +1081,10 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
+ final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
mbean.endDataLedgerDeleteOp();
+ store.recordWriteLatency(System.nanoTime() - now, 1L);
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc);
}, null);
}
@@ -1092,6 +1099,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
+ store.recordWriteCount(3); // create-ledger performs 3 writes on zk
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null);
}
@@ -1158,8 +1166,10 @@ CompletableFuture getLedgerHandle(long ledgerId) {
log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId);
}
mbean.startDataLedgerOpenOp();
+ final long now = System.nanoTime();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
+ store.recordReadLatency(System.nanoTime() - now, 1L);
executor.submit(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
@@ -1447,7 +1457,9 @@ public void operationComplete(Void result, Stat stat) {
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
+ final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> {
+ store.recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == BKException.Code.NoSuchLedgerExistsException) {
log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId());
} else if (rc != BKException.Code.OK) {
@@ -1563,7 +1575,9 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleting ledger {}", name, ls);
}
+ final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
+ store.recordWriteLatency(System.nanoTime() - now, 1L);
switch (rc) {
case BKException.Code.NoSuchLedgerExistsException:
log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId());
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index d7419ca13a4d5..511ad13230c51 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -133,4 +133,30 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @throws MetaStoreException
*/
Iterable getManagedLedgers() throws MetaStoreException;
+
+ /**
+ * Record write zk write with latency (in nano-seconds) for zk-op stats
+ *
+ * @param latency
+ */
+ void recordWriteLatency(long latencyInNs, long count);
+
+ /**
+ * Record write zk operation for zk-op stats
+ *
+ */
+ void recordWriteCount(long count);
+
+ /**
+ * Record read zk read with latency (in nano-seconds) for zk-op stats
+ *
+ * @param latency
+ */
+ void recordReadLatency(long latency, long count);
+
+ /**
+ * Record read zk operation for zk-op stats
+ *
+ */
+ void recordReadCount(long count);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 49c23c91e6b34..000160528d0c6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -20,11 +20,14 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.util.DimensionStats;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -57,6 +60,10 @@ public static enum ZNodeProtobufFormat {
private final ZooKeeper zk;
private final ZNodeProtobufFormat protobufFormat;
private final OrderedSafeExecutor executor;
+ private final LongAdder numWrite;
+ private final LongAdder numRead;
+ private final DimensionStats zkWriteLatencyStats;
+ private final DimensionStats zkReadLatencyStats;
private static class ZKStat implements Stat {
private final int version;
@@ -100,6 +107,10 @@ public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat,
this.zk = zk;
this.protobufFormat = protobufFormat;
this.executor = executor;
+ this.numWrite = new LongAdder();
+ this.numRead = new LongAdder();
+ this.zkWriteLatencyStats = new DimensionStats();
+ this.zkReadLatencyStats = new DimensionStats();
if (zk.exists(prefixName, false) == null) {
zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT);
@@ -133,7 +144,9 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
@Override
public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback callback) {
// Try to get the content or create an empty node
+ final long now = System.nanoTime();
zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> {
+ recordReadLatency(System.nanoTime() - now, 1L);
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
@@ -175,8 +188,10 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St
mlInfo.toString().getBytes(Encoding) : // Text format
mlInfo.toByteArray(); // Binary format
+ final long now = System.nanoTime();
zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
(rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
+ recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion() : "null");
@@ -200,7 +215,9 @@ public void getCursors(final String ledgerName, final MetaStoreCallback executor.submit(safeRun(() -> {
+ recordReadLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children);
}
@@ -223,8 +240,9 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName,
if (log.isDebugEnabled()) {
log.debug("Reading from {}", path);
}
-
+ final long now = System.nanoTime();
zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> {
+ recordReadLatency(System.nanoTime() - now, 1L);
if (rc != Code.OK.intValue()) {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
@@ -257,8 +275,10 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
+ final long now = System.nanoTime();
zk.create(path, content, Acl, CreateMode.PERSISTENT,
(rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
+ recordWriteLatency(System.nanoTime() - now, 1L);
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
@@ -276,7 +296,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
+ final long now = System.nanoTime();
zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> {
+ recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) {
@@ -292,7 +314,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
public void asyncRemoveCursor(final String ledgerName, final String consumerName,
final MetaStoreCallback callback) {
log.info("[{}] Remove consumer={}", ledgerName, consumerName);
+ final long now = System.nanoTime();
zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> {
+ recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc));
}
@@ -307,7 +331,9 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
+ final long now = System.nanoTime();
zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> {
+ recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc));
}
@@ -386,5 +412,43 @@ private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws I
return ManagedCursorInfo.newBuilder().mergeFrom(data).build();
}
+ @Override
+ public void recordWriteLatency(long latencyInNs, long count) {
+ zkWriteLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count);
+ numWrite.add(count);
+ }
+
+ @Override
+ public void recordWriteCount(long count) {
+ numWrite.add(count);
+ }
+
+ @Override
+ public void recordReadLatency(long latencyInNs, long count) {
+ zkReadLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count);
+ numRead.add(count);
+ }
+
+ @Override
+ public void recordReadCount(long count) {
+ numRead.add(count);
+ }
+
+ public long getAndResetNumOfWrite() {
+ return numWrite.sumThenReset();
+ }
+
+ public long getAndResetNumOfRead() {
+ return numRead.sumThenReset();
+ }
+
+ public DimensionStats getZkWriteLatencyStats() {
+ return this.zkWriteLatencyStats;
+ }
+
+ public DimensionStats getZkReadLatencyStats() {
+ return this.zkReadLatencyStats;
+ }
+
private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java
new file mode 100644
index 0000000000000..27508d30c8ddd
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class DimensionStats {
+
+ /** Statistics for given dimension **/
+ public double meanDimensionMs;
+
+ public double medianDimensionMs;
+
+ public double dimension95Ms;
+
+ public double dimension99Ms;
+
+ public double dimension999Ms;
+
+ public double dimension9999Ms;
+
+ public double dimensionCounts;
+
+ public double elapsedIntervalMs;
+
+ private final long maxTrackableSeconds = 120;
+ private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(maxTrackableSeconds), 2);
+ private Histogram dimensionHistogram = null;
+
+ public void updateStats() {
+
+ dimensionHistogram = dimensionTimeRecorder.getIntervalHistogram(dimensionHistogram);
+
+ this.meanDimensionMs = dimensionHistogram.getMean();
+ this.medianDimensionMs = dimensionHistogram.getValueAtPercentile(50);
+ this.dimension95Ms = dimensionHistogram.getValueAtPercentile(95);
+ this.dimension99Ms = dimensionHistogram.getValueAtPercentile(99);
+ this.dimension999Ms = dimensionHistogram.getValueAtPercentile(99.9);
+ this.dimension9999Ms = dimensionHistogram.getValueAtPercentile(99.99);
+ this.dimensionCounts = dimensionHistogram.getTotalCount();
+ }
+
+ public void recordValue(long dimensionLatencyMs) {
+ dimensionLatencyMs = dimensionLatencyMs > maxTrackableSeconds ? maxTrackableSeconds : dimensionLatencyMs;
+ dimensionTimeRecorder.recordValue(dimensionLatencyMs);
+ }
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
index f2e4a359ae070..4636da13c9596 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
@@ -15,8 +15,6 @@
*/
package com.yahoo.pulsar.broker.namespace;
-import static com.google.common.base.Preconditions.checkState;
-
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -24,6 +22,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -36,8 +36,6 @@
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalCause;
-import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.broker.PulsarService;
@@ -105,6 +103,11 @@ public class OwnershipCache {
* The NamespaceBundleFactory to construct NamespaceBundles
*/
private final NamespaceBundleFactory bundleFactory;
+
+ /**
+ * Use to record zk-write operations
+ */
+ private final MetaStore metaStore;
private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader {
@@ -124,8 +127,10 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe
}
CompletableFuture future = new CompletableFuture<>();
+ final long now = System.nanoTime();
ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
+ metaStore.recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == KeeperException.Code.OK.intValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode);
@@ -162,6 +167,7 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor())
.buildAsync(new OwnedServiceUnitCacheLoader());
+ this.metaStore = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getMetaStore();
}
/**
@@ -252,7 +258,9 @@ public CompletableFuture tryAcquiringOwnership(Namespace
public CompletableFuture removeOwnership(NamespaceBundle bundle) {
CompletableFuture result = new CompletableFuture<>();
String key = ServiceUnitZkUtils.path(bundle);
+ final long now = System.nanoTime();
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
+ metaStore.recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc));
ownedBundlesCache.synchronous().invalidate(key);
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java
index 4caa13b2c23ea..5c02f439c9ba0 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java
@@ -21,6 +21,10 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.MetaStore;
+import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +56,7 @@ public class PulsarStats implements Closeable {
private List tempMetricsCollection;
private List metricsCollection;
private final BrokerOperabilityMetrics brokerOperabilityMetrics;
+ private final ManagedLedgerFactory ledgerFactory;
private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();
@@ -67,6 +72,7 @@ public PulsarStats(PulsarService pulsar) {
this.metricsCollection = Lists.newArrayList();
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
pulsar.getAdvertisedAddress());
+ this.ledgerFactory = pulsar.getManagedLedgerFactory();
}
@Override
@@ -140,7 +146,9 @@ public synchronized void updateStats(
}
brokerOperabilityMetrics.getMetrics()
.forEach(brokerOperabilityMetric -> tempMetricsCollection.add(brokerOperabilityMetric));
-
+ // add zk-op-stats metrics
+ tempMetricsCollection.add(brokerOperabilityMetrics.getZkLatencyMetrics(
+ (MetaStoreImplZookeeper) (((ManagedLedgerFactoryImpl) ledgerFactory).getMetaStore())));
// json end
topicStatsStream.endObject();
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 9649366d19448..3244d8bd747aa 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -21,6 +21,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
+import org.apache.bookkeeper.mledger.util.DimensionStats;
+
/**
*/
public class BrokerOperabilityMetrics {
@@ -66,6 +69,40 @@ Metrics getTopicLoadMetrics() {
return dMetrics;
}
+ public Metrics getZkLatencyMetrics(MetaStoreImplZookeeper metaStore) {
+ Map dimensionMap = Maps.newHashMap();
+ dimensionMap.put("broker", brokerName);
+ dimensionMap.put("cluster", localCluster);
+ dimensionMap.put("metric", "zk_op_stats");
+ Metrics dMetrics = Metrics.create(dimensionMap);
+
+ DimensionStats zkWriteLatencyStats = metaStore.getZkWriteLatencyStats();
+ DimensionStats zkReadLatencyStats = metaStore.getZkReadLatencyStats();
+ zkWriteLatencyStats.updateStats();
+ zkReadLatencyStats.updateStats();
+
+ dMetrics.put("zk_latency_write_mean_ms", zkWriteLatencyStats.meanDimensionMs);
+ dMetrics.put("zk_latency_write_time_median_ms", zkWriteLatencyStats.medianDimensionMs);
+ dMetrics.put("zk_latency_write_95percentile_ms", zkWriteLatencyStats.dimension95Ms);
+ dMetrics.put("zk_latency_write_99_percentile_ms", zkWriteLatencyStats.dimension99Ms);
+ dMetrics.put("zk_latency_write_99_9_percentile_ms", zkWriteLatencyStats.dimension999Ms);
+ dMetrics.put("zk_latency_write_99_99_percentile_ms", zkWriteLatencyStats.dimension999Ms);
+ dMetrics.put("zk_latency_write_count", zkWriteLatencyStats.dimensionCounts);
+
+ dMetrics.put("zk_latency_read_mean_ms", zkReadLatencyStats.meanDimensionMs);
+ dMetrics.put("zk_latency_read_time_median_ms", zkReadLatencyStats.medianDimensionMs);
+ dMetrics.put("zk_latency_read_95percentile_ms", zkReadLatencyStats.dimension95Ms);
+ dMetrics.put("zk_latency_read_99_percentile_ms", zkReadLatencyStats.dimension99Ms);
+ dMetrics.put("zk_latency_read_99_9_percentile_ms", zkReadLatencyStats.dimension999Ms);
+ dMetrics.put("zk_latency_read_99_99_percentile_ms", zkReadLatencyStats.dimension999Ms);
+ dMetrics.put("zk_latency_read_count", zkReadLatencyStats.dimensionCounts);
+
+ dMetrics.put("zk_write_rate", metaStore.getAndResetNumOfWrite());
+ dMetrics.put("zk_read_rate", metaStore.getAndResetNumOfRead());
+
+ return dMetrics;
+ }
+
public void reset() {
metricsList.clear();
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java
index f146c1a6f47dd..52504f4d8d03e 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -28,8 +28,9 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -59,6 +60,8 @@ public class OwnershipCacheTest {
private NamespaceBundleFactory bundleFactory;
private NamespaceService nsService;
private BrokerService brokerService;
+ private ManagedLedgerFactoryImpl ledgerFactory;
+ private MetaStoreImplZookeeper metaStore;
private OrderedSafeExecutor executor;
@BeforeMethod
@@ -67,12 +70,14 @@ public void setup() throws Exception {
selfBrokerUrl = "tcp://localhost:" + port;
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
+ ledgerFactory = mock(ManagedLedgerFactoryImpl.class);
executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
+ metaStore = new MetaStoreImplZookeeper(zkCache.getZooKeeper(), executor);
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(anyObject());
doReturn(zkCache).when(pulsar).getLocalZkCache();
@@ -83,6 +88,8 @@ public void setup() throws Exception {
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(webAddress(config)).when(pulsar).getWebServiceAddress();
doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
+ doReturn(ledgerFactory).when(pulsar).getManagedLedgerFactory();
+ doReturn(metaStore).when(ledgerFactory).getMetaStore();
}
@AfterMethod
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
index 059cc167020fb..8c07c8dce930d 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
@@ -302,7 +302,7 @@ public void testBrokerStatsMetrics() throws Exception {
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
JsonArray metrics = brokerStatsClient.getMetrics();
- assertEquals(metrics.size(), 4, metrics.toString());
+ assertEquals(metrics.size(), 5, metrics.toString());
// these metrics seem to be arriving in different order at different times...
// is the order really relevant here?
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index ba7d39be3b724..5e03ee189b889 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -26,6 +26,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import com.yahoo.pulsar.client.api.Producer;
@@ -85,4 +86,56 @@ public void testManagedLedgerMetrics() throws Exception {
}
+
+ @Test
+ public void testZkOpStatsMetrics() throws Exception {
+
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
+ Metrics zkOpMetric = getMetric("zk_op_stats");
+ Assert.assertNotNull(zkOpMetric);
+ Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_write_99_99_percentile_ms"));
+ Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_read_99_99_percentile_ms"));
+ Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_read_rate") > 0);
+ Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_write_rate") > 0);
+
+ // create another topic
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2");
+ zkOpMetric = getMetric("zk_op_stats");
+ // save read and write rate per topic: which should be the same for all topics
+ long readRate = (long) zkOpMetric.getMetrics().get("zk_read_rate");
+ long writeRete = (long) zkOpMetric.getMetrics().get("zk_write_rate");
+ Assert.assertTrue(readRate > 0);
+ Assert.assertTrue(writeRete > 0);
+
+ // create three new topics : which should create thrice read/write rate compare to previous one
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3");
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4");
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic5");
+ zkOpMetric = getMetric("zk_op_stats");
+ long readRate2 = (long) zkOpMetric.getMetrics().get("zk_read_rate");
+ long writeRete2 = (long) zkOpMetric.getMetrics().get("zk_write_rate");
+ Assert.assertEquals(readRate2, 3 * readRate);
+ Assert.assertEquals(writeRete2, 3 * writeRete);
+
+ // same topic doesn't create any zk-operation
+ pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic5");
+ zkOpMetric = getMetric("zk_op_stats");
+ readRate2 = (long) zkOpMetric.getMetrics().get("zk_read_rate");
+ writeRete2 = (long) zkOpMetric.getMetrics().get("zk_write_rate");
+ Assert.assertEquals(readRate2, 0);
+ Assert.assertEquals(writeRete2, 0);
+
+ }
+
+ private Metrics getMetric(String dimension) {
+ BrokerService brokerService = pulsar.getBrokerService();
+ brokerService.updateRates();
+ for (Metrics metric : brokerService.getDestinationMetrics()) {
+ if (dimension.equalsIgnoreCase(metric.getDimension("metric"))) {
+ return metric;
+ }
+ }
+ return null;
+ }
+
}