From b39f318c998face61b7c5c28731bd37951aef283 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 28 Mar 2017 14:20:41 -0700 Subject: [PATCH 1/2] introduce zk-stats: zk-latency + zk-rate --- managed-ledger/pom.xml | 7 ++- .../mledger/impl/ManagedCursorImpl.java | 4 ++ .../mledger/impl/ManagedLedgerImpl.java | 15 +++++ .../bookkeeper/mledger/impl/MetaStore.java | 26 ++++++++ .../mledger/impl/MetaStoreImplZookeeper.java | 59 ++++++++++++++++- .../mledger/util/DimensionStats.java | 63 +++++++++++++++++++ .../broker/namespace/OwnershipCache.java | 16 +++-- .../pulsar/broker/service/PulsarStats.java | 10 ++- .../stats/BrokerOperabilityMetrics.java | 26 ++++++++ .../stats/ManagedLedgerMetricsTest.java | 52 +++++++++++++++ 10 files changed, 271 insertions(+), 7 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index e983198fb7ca9..f5dd09011d6ec 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -57,7 +57,12 @@ org.slf4j slf4j-api - + + + org.hdrhistogram + HdrHistogram + + org.mockito mockito-core diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 600d3777f5aaf..6c17cf283af96 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -222,7 +222,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); + final long now = System.currentTimeMillis(); bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + ledger.getStore().recordRead(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } @@ -1823,8 +1825,10 @@ void internalFlushPendingMarkDeletes() { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); + final long now = System.currentTimeMillis(); bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + ledger.getStore().recordWrite(System.currentTimeMillis() - now); ledger.getExecutor().submit(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a98604ff638e9..752d09004d3ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -248,6 +248,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (log.isDebugEnabled()) { log.debug("[{}] Opening legder {}", name, id); } + store.recordRead(); mbean.startDataLedgerOpenOp(); bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null); } else { @@ -276,7 +277,9 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); } else { iterator.remove(); + final long now = System.currentTimeMillis(); bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> { + store.recordWrite(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc); } @@ -300,8 +303,10 @@ public void operationFailed(MetaStoreException e) { // Create a new ledger to start writing this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); + final long now = System.currentTimeMillis(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + store.recordWrite(System.currentTimeMillis() - now); executor.submitOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { @@ -478,6 +483,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); + store.recordWrite(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx); } @@ -1076,8 +1082,10 @@ synchronized void ledgerClosed(final LedgerHandle lh) { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); + final long now = System.currentTimeMillis(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> { mbean.endDataLedgerDeleteOp(); + store.recordWrite(System.currentTimeMillis() - now); log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc); }, null); } @@ -1092,6 +1100,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); + store.recordWrite(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null); } @@ -1158,8 +1167,10 @@ CompletableFuture getLedgerHandle(long ledgerId) { log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId); } mbean.startDataLedgerOpenOp(); + final long now = System.currentTimeMillis(); bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (int rc, LedgerHandle lh, Object ctx) -> { + store.recordRead(System.currentTimeMillis() - now); executor.submit(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (rc != BKException.Code.OK) { @@ -1447,7 +1458,9 @@ public void operationComplete(Void result, Stat stat) { for (LedgerInfo ls : ledgersToDelete) { log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); + final long now = System.currentTimeMillis(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> { + store.recordWrite(System.currentTimeMillis() - now); if (rc == BKException.Code.NoSuchLedgerExistsException) { log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId()); } else if (rc != BKException.Code.OK) { @@ -1563,7 +1576,9 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Deleting ledger {}", name, ls); } + final long now = System.currentTimeMillis(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> { + store.recordWrite(System.currentTimeMillis() - now); switch (rc) { case BKException.Code.NoSuchLedgerExistsException: log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index d7419ca13a4d5..e03e4171bae83 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -133,4 +133,30 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn * @throws MetaStoreException */ Iterable getManagedLedgers() throws MetaStoreException; + + /** + * Record write zk operation with latency for zk-op stats + * + * @param latency + */ + void recordWrite(long latency); + + /** + * Record write zk operation for zk-op stats + * + */ + void recordWrite(); + + /** + * Record read zk operation with latency for zk-op stats + * + * @param latency + */ + void recordRead(long latency); + + /** + * Record read zk operation for zk-op stats + * + */ + void recordRead(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 49c23c91e6b34..bb63fa9d8560b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,11 +20,13 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.util.DimensionStats; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.AsyncCallback.StringCallback; @@ -57,6 +59,9 @@ public static enum ZNodeProtobufFormat { private final ZooKeeper zk; private final ZNodeProtobufFormat protobufFormat; private final OrderedSafeExecutor executor; + private final LongAdder numWrite; + private final LongAdder numRead; + private final DimensionStats zkOpLatencyStats; private static class ZKStat implements Stat { private final int version; @@ -100,6 +105,9 @@ public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat, this.zk = zk; this.protobufFormat = protobufFormat; this.executor = executor; + this.numWrite = new LongAdder(); + this.numRead = new LongAdder(); + this.zkOpLatencyStats = new DimensionStats(); if (zk.exists(prefixName, false) == null) { zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT); @@ -133,7 +141,9 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { @Override public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback callback) { // Try to get the content or create an empty node + final long now = System.currentTimeMillis(); zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> { + recordRead(System.currentTimeMillis() - now); if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info = parseManagedLedgerInfo(readData); @@ -175,8 +185,10 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St mlInfo.toString().getBytes(Encoding) : // Text format mlInfo.toByteArray(); // Binary format + final long now = System.currentTimeMillis(); zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { + recordWrite(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, Code.get(rc), stat != null ? stat.getVersion() : "null"); @@ -200,7 +212,9 @@ public void getCursors(final String ledgerName, final MetaStoreCallback executor.submit(safeRun(() -> { + recordRead(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children); } @@ -223,8 +237,9 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName, if (log.isDebugEnabled()) { log.debug("Reading from {}", path); } - + final long now = System.currentTimeMillis(); zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> { + recordRead(System.currentTimeMillis() - now); if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { @@ -257,8 +272,10 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } + final long now = System.currentTimeMillis(); zk.create(path, content, Acl, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> executor.submit(safeRun(() -> { + recordWrite(System.currentTimeMillis() - now); if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName, cursorName, info, Code.get(rc)); @@ -276,7 +293,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } + final long now = System.currentTimeMillis(); zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> { + recordWrite(System.currentTimeMillis() - now); if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { @@ -292,7 +311,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa public void asyncRemoveCursor(final String ledgerName, final String consumerName, final MetaStoreCallback callback) { log.info("[{}] Remove consumer={}", ledgerName, consumerName); + final long now = System.currentTimeMillis(); zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + recordWrite(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc)); } @@ -307,7 +328,9 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName @Override public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) { log.info("[{}] Remove ManagedLedger", ledgerName); + final long now = System.currentTimeMillis(); zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + recordWrite(System.currentTimeMillis() - now); if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc)); } @@ -386,5 +409,39 @@ private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws I return ManagedCursorInfo.newBuilder().mergeFrom(data).build(); } + @Override + public void recordWrite(long latency) { + zkOpLatencyStats.recordValue(latency); + numWrite.increment(); + } + + @Override + public void recordWrite() { + numWrite.increment(); + } + + @Override + public void recordRead(long latency) { + zkOpLatencyStats.recordValue(latency); + numRead.increment(); + } + + @Override + public void recordRead() { + numRead.increment(); + } + + public long getAndResetNumOfWrite() { + return numWrite.sumThenReset(); + } + + public long getAndResetNumOfRead() { + return numRead.sumThenReset(); + } + + public DimensionStats getZkOpLatencyStats() { + return this.zkOpLatencyStats; + } + private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java new file mode 100644 index 0000000000000..99966b0bfc946 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java @@ -0,0 +1,63 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class DimensionStats { + + /** Statistics for given dimension **/ + public double meanDimensionMs; + + public double medianDimensionMs; + + public double dimension95Ms; + + public double dimension99Ms; + + public double dimension999Ms; + + public double dimension9999Ms; + + public double dimensionCounts; + + public double elapsedIntervalMs; + + private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.MINUTES.toMillis(10), 2); + private Histogram dimensionHistogram = null; + + public void updateStats() { + + dimensionHistogram = dimensionTimeRecorder.getIntervalHistogram(dimensionHistogram); + + this.meanDimensionMs = dimensionHistogram.getMean(); + this.medianDimensionMs = dimensionHistogram.getValueAtPercentile(50); + this.dimension95Ms = dimensionHistogram.getValueAtPercentile(95); + this.dimension99Ms = dimensionHistogram.getValueAtPercentile(99); + this.dimension999Ms = dimensionHistogram.getValueAtPercentile(99.9); + this.dimension9999Ms = dimensionHistogram.getValueAtPercentile(99.99); + this.dimensionCounts = dimensionHistogram.getTotalCount(); + } + + public void recordValue(long dimensionLatencyMs) { + dimensionTimeRecorder.recordValue(dimensionLatencyMs); + } +} diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java index f2e4a359ae070..bc3eb8b4656df 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java @@ -15,8 +15,6 @@ */ package com.yahoo.pulsar.broker.namespace; -import static com.google.common.base.Preconditions.checkState; - import java.util.List; import java.util.Map; import java.util.Optional; @@ -24,6 +22,8 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.MetaStore; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -36,8 +36,6 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.pulsar.broker.PulsarService; @@ -105,6 +103,11 @@ public class OwnershipCache { * The NamespaceBundleFactory to construct NamespaceBundles */ private final NamespaceBundleFactory bundleFactory; + + /** + * Use to record zk-write operations + */ + private final MetaStore metaStore; private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader { @@ -124,8 +127,10 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe } CompletableFuture future = new CompletableFuture<>(); + final long now = System.currentTimeMillis(); ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { + metaStore.recordWrite(System.currentTimeMillis() - now); if (rc == KeeperException.Code.OK.intValue()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode); @@ -162,6 +167,7 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory // ownedBundlesCache contains all namespaces that are owned by the local broker this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor()) .buildAsync(new OwnedServiceUnitCacheLoader()); + this.metaStore = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getMetaStore(); } /** @@ -252,7 +258,9 @@ public CompletableFuture tryAcquiringOwnership(Namespace public CompletableFuture removeOwnership(NamespaceBundle bundle) { CompletableFuture result = new CompletableFuture<>(); String key = ServiceUnitZkUtils.path(bundle); + final long now = System.currentTimeMillis(); localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { + metaStore.recordWrite(System.currentTimeMillis() - now); if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) { LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc)); ownedBundlesCache.synchronous().invalidate(key); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java index 4caa13b2c23ea..5c02f439c9ba0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java @@ -21,6 +21,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.MetaStore; +import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +56,7 @@ public class PulsarStats implements Closeable { private List tempMetricsCollection; private List metricsCollection; private final BrokerOperabilityMetrics brokerOperabilityMetrics; + private final ManagedLedgerFactory ledgerFactory; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); @@ -67,6 +72,7 @@ public PulsarStats(PulsarService pulsar) { this.metricsCollection = Lists.newArrayList(); this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress()); + this.ledgerFactory = pulsar.getManagedLedgerFactory(); } @Override @@ -140,7 +146,9 @@ public synchronized void updateStats( } brokerOperabilityMetrics.getMetrics() .forEach(brokerOperabilityMetric -> tempMetricsCollection.add(brokerOperabilityMetric)); - + // add zk-op-stats metrics + tempMetricsCollection.add(brokerOperabilityMetrics.getZkLatencyMetrics( + (MetaStoreImplZookeeper) (((ManagedLedgerFactoryImpl) ledgerFactory).getMetaStore()))); // json end topicStatsStream.endObject(); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java index 9649366d19448..38e7a88a0aa45 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Map; +import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper; +import org.apache.bookkeeper.mledger.util.DimensionStats; + /** */ public class BrokerOperabilityMetrics { @@ -66,6 +69,29 @@ Metrics getTopicLoadMetrics() { return dMetrics; } + public Metrics getZkLatencyMetrics(MetaStoreImplZookeeper metaStore) { + Map dimensionMap = Maps.newHashMap(); + dimensionMap.put("broker", brokerName); + dimensionMap.put("cluster", localCluster); + dimensionMap.put("metric", "zk_op_stats"); + Metrics dMetrics = Metrics.create(dimensionMap); + + DimensionStats zkOpLatencyStats = metaStore.getZkOpLatencyStats(); + zkOpLatencyStats.updateStats(); + + dMetrics.put("zk_latency_mean_ms", zkOpLatencyStats.meanDimensionMs); + dMetrics.put("zk_latency_time_median_ms", zkOpLatencyStats.medianDimensionMs); + dMetrics.put("zk_latency_95percentile_ms", zkOpLatencyStats.dimension95Ms); + dMetrics.put("zk_latency_99_percentile_ms", zkOpLatencyStats.dimension99Ms); + dMetrics.put("zk_latency_99_9_percentile_ms", zkOpLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_99_99_percentile_ms", zkOpLatencyStats.dimension999Ms); + dMetrics.put("zk_op_count", zkOpLatencyStats.dimensionCounts); + dMetrics.put("zk_write_rate", metaStore.getAndResetNumOfWrite()); + dMetrics.put("zk_read_rate", metaStore.getAndResetNumOfRead()); + + return dMetrics; + } + public void reset() { metricsList.clear(); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java index ba7d39be3b724..f107a548b1bac 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.yahoo.pulsar.broker.service.BrokerService; import com.yahoo.pulsar.broker.service.BrokerTestBase; import com.yahoo.pulsar.broker.stats.metrics.ManagedLedgerMetrics; import com.yahoo.pulsar.client.api.Producer; @@ -85,4 +86,55 @@ public void testManagedLedgerMetrics() throws Exception { } + + @Test + public void testZkOpStatsMetrics() throws Exception { + + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1"); + Metrics zkOpMetric = getMetric("zk_op_stats"); + Assert.assertNotNull(zkOpMetric); + Assert.assertTrue((double) zkOpMetric.getMetrics().get("zk_latenc_99_99_percentile_ms") > 0); + Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_read_rate") > 0); + Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_write_rate") > 0); + + // create another topic + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2"); + zkOpMetric = getMetric("zk_op_stats"); + // save read and write rate per topic: which should be the same for all topics + long readRate = (long) zkOpMetric.getMetrics().get("zk_read_rate"); + long writeRete = (long) zkOpMetric.getMetrics().get("zk_write_rate"); + Assert.assertTrue(readRate > 0); + Assert.assertTrue(writeRete > 0); + + // create three new topics : which should create thrice read/write rate compare to previous one + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3"); + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4"); + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic5"); + zkOpMetric = getMetric("zk_op_stats"); + long readRate2 = (long) zkOpMetric.getMetrics().get("zk_read_rate"); + long writeRete2 = (long) zkOpMetric.getMetrics().get("zk_write_rate"); + Assert.assertEquals(readRate2, 3 * readRate); + Assert.assertEquals(writeRete2, 3 * writeRete); + + // same topic doesn't create any zk-operation + pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic5"); + zkOpMetric = getMetric("zk_op_stats"); + readRate2 = (long) zkOpMetric.getMetrics().get("zk_read_rate"); + writeRete2 = (long) zkOpMetric.getMetrics().get("zk_write_rate"); + Assert.assertEquals(readRate2, 0); + Assert.assertEquals(writeRete2, 0); + + } + + private Metrics getMetric(String dimension) { + BrokerService brokerService = pulsar.getBrokerService(); + brokerService.updateRates(); + for (Metrics metric : brokerService.getDestinationMetrics()) { + if (dimension.equalsIgnoreCase(metric.getDimension("metric"))) { + return metric; + } + } + return null; + } + } From aff71576bdc32e107b85717e72c90381b8fb7f0d Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 29 Mar 2017 14:46:08 -0700 Subject: [PATCH 2/2] fix: added read-latency histogram, reduce histogram-highestTrackableTime, measuring time using nano-sec, 3 write-count for create-ledger --- .../mledger/impl/ManagedCursorImpl.java | 7 +- .../mledger/impl/ManagedLedgerImpl.java | 29 ++++---- .../bookkeeper/mledger/impl/MetaStore.java | 12 ++-- .../mledger/impl/MetaStoreImplZookeeper.java | 69 ++++++++++--------- .../mledger/util/DimensionStats.java | 4 +- .../broker/namespace/OwnershipCache.java | 8 +-- .../stats/BrokerOperabilityMetrics.java | 31 ++++++--- .../broker/namespace/OwnershipCacheTest.java | 9 ++- .../broker/service/BrokerServiceTest.java | 2 +- .../stats/ManagedLedgerMetricsTest.java | 3 +- 10 files changed, 100 insertions(+), 74 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 6c17cf283af96..e3ad5073da7a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -222,9 +222,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getStore().recordRead(System.currentTimeMillis() - now); + ledger.getStore().recordReadLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } @@ -1825,10 +1825,9 @@ void internalFlushPendingMarkDeletes() { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); - final long now = System.currentTimeMillis(); bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getStore().recordWrite(System.currentTimeMillis() - now); + ledger.getStore().recordWriteCount(3L); // create-ledger-performs-3-write ledger.getExecutor().submit(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 752d09004d3ee..35fe2a649a172 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -248,7 +248,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (log.isDebugEnabled()) { log.debug("[{}] Opening legder {}", name, id); } - store.recordRead(); + store.recordReadCount(1L); mbean.startDataLedgerOpenOp(); bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null); } else { @@ -277,9 +277,9 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); } else { iterator.remove(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc); } @@ -303,10 +303,9 @@ public void operationFailed(MetaStoreException e) { // Create a new ledger to start writing this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - final long now = System.currentTimeMillis(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteCount(3L); executor.submitOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { @@ -483,7 +482,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - store.recordWrite(); + store.recordWriteCount(3); // create-ledger performs 3 writes on zk bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx); } @@ -1082,10 +1081,10 @@ synchronized void ledgerClosed(final LedgerHandle lh) { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> { mbean.endDataLedgerDeleteOp(); - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc); }, null); } @@ -1100,7 +1099,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - store.recordWrite(); + store.recordWriteCount(3); // create-ledger performs 3 writes on zk bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null); } @@ -1167,10 +1166,10 @@ CompletableFuture getLedgerHandle(long ledgerId) { log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId); } mbean.startDataLedgerOpenOp(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (int rc, LedgerHandle lh, Object ctx) -> { - store.recordRead(System.currentTimeMillis() - now); + store.recordReadLatency(System.nanoTime() - now, 1L); executor.submit(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (rc != BKException.Code.OK) { @@ -1458,9 +1457,9 @@ public void operationComplete(Void result, Stat stat) { for (LedgerInfo ls : ledgersToDelete) { log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == BKException.Code.NoSuchLedgerExistsException) { log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId()); } else if (rc != BKException.Code.OK) { @@ -1576,9 +1575,9 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Deleting ledger {}", name, ls); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); switch (rc) { case BKException.Code.NoSuchLedgerExistsException: log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index e03e4171bae83..511ad13230c51 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -135,28 +135,28 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn Iterable getManagedLedgers() throws MetaStoreException; /** - * Record write zk operation with latency for zk-op stats + * Record write zk write with latency (in nano-seconds) for zk-op stats * * @param latency */ - void recordWrite(long latency); + void recordWriteLatency(long latencyInNs, long count); /** * Record write zk operation for zk-op stats * */ - void recordWrite(); + void recordWriteCount(long count); /** - * Record read zk operation with latency for zk-op stats + * Record read zk read with latency (in nano-seconds) for zk-op stats * * @param latency */ - void recordRead(long latency); + void recordReadLatency(long latency, long count); /** * Record read zk operation for zk-op stats * */ - void recordRead(); + void recordReadCount(long count); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index bb63fa9d8560b..000160528d0c6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,6 +20,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; @@ -61,7 +62,8 @@ public static enum ZNodeProtobufFormat { private final OrderedSafeExecutor executor; private final LongAdder numWrite; private final LongAdder numRead; - private final DimensionStats zkOpLatencyStats; + private final DimensionStats zkWriteLatencyStats; + private final DimensionStats zkReadLatencyStats; private static class ZKStat implements Stat { private final int version; @@ -107,7 +109,8 @@ public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat, this.executor = executor; this.numWrite = new LongAdder(); this.numRead = new LongAdder(); - this.zkOpLatencyStats = new DimensionStats(); + this.zkWriteLatencyStats = new DimensionStats(); + this.zkReadLatencyStats = new DimensionStats(); if (zk.exists(prefixName, false) == null) { zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT); @@ -141,9 +144,9 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { @Override public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback callback) { // Try to get the content or create an empty node - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info = parseManagedLedgerInfo(readData); @@ -185,10 +188,10 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St mlInfo.toString().getBytes(Encoding) : // Text format mlInfo.toByteArray(); // Binary format - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, Code.get(rc), stat != null ? stat.getVersion() : "null"); @@ -212,9 +215,9 @@ public void getCursors(final String ledgerName, final MetaStoreCallback executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children); } @@ -237,9 +240,9 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName, if (log.isDebugEnabled()) { log.debug("Reading from {}", path); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { @@ -272,10 +275,10 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.create(path, content, Acl, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName, cursorName, info, Code.get(rc)); @@ -293,9 +296,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { @@ -311,9 +314,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa public void asyncRemoveCursor(final String ledgerName, final String consumerName, final MetaStoreCallback callback) { log.info("[{}] Remove consumer={}", ledgerName, consumerName); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc)); } @@ -328,9 +331,9 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName @Override public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) { log.info("[{}] Remove ManagedLedger", ledgerName); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc)); } @@ -410,25 +413,25 @@ private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws I } @Override - public void recordWrite(long latency) { - zkOpLatencyStats.recordValue(latency); - numWrite.increment(); + public void recordWriteLatency(long latencyInNs, long count) { + zkWriteLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count); + numWrite.add(count); } - + @Override - public void recordWrite() { - numWrite.increment(); + public void recordWriteCount(long count) { + numWrite.add(count); } @Override - public void recordRead(long latency) { - zkOpLatencyStats.recordValue(latency); - numRead.increment(); + public void recordReadLatency(long latencyInNs, long count) { + zkReadLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count); + numRead.add(count); } @Override - public void recordRead() { - numRead.increment(); + public void recordReadCount(long count) { + numRead.add(count); } public long getAndResetNumOfWrite() { @@ -439,8 +442,12 @@ public long getAndResetNumOfRead() { return numRead.sumThenReset(); } - public DimensionStats getZkOpLatencyStats() { - return this.zkOpLatencyStats; + public DimensionStats getZkWriteLatencyStats() { + return this.zkWriteLatencyStats; + } + + public DimensionStats getZkReadLatencyStats() { + return this.zkReadLatencyStats; } private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java index 99966b0bfc946..27508d30c8ddd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java @@ -41,7 +41,8 @@ public class DimensionStats { public double elapsedIntervalMs; - private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.MINUTES.toMillis(10), 2); + private final long maxTrackableSeconds = 120; + private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(maxTrackableSeconds), 2); private Histogram dimensionHistogram = null; public void updateStats() { @@ -58,6 +59,7 @@ public void updateStats() { } public void recordValue(long dimensionLatencyMs) { + dimensionLatencyMs = dimensionLatencyMs > maxTrackableSeconds ? maxTrackableSeconds : dimensionLatencyMs; dimensionTimeRecorder.recordValue(dimensionLatencyMs); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java index bc3eb8b4656df..4636da13c9596 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java @@ -127,10 +127,10 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe } CompletableFuture future = new CompletableFuture<>(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { - metaStore.recordWrite(System.currentTimeMillis() - now); + metaStore.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == KeeperException.Code.OK.intValue()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode); @@ -258,9 +258,9 @@ public CompletableFuture tryAcquiringOwnership(Namespace public CompletableFuture removeOwnership(NamespaceBundle bundle) { CompletableFuture result = new CompletableFuture<>(); String key = ServiceUnitZkUtils.path(bundle); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { - metaStore.recordWrite(System.currentTimeMillis() - now); + metaStore.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) { LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc)); ownedBundlesCache.synchronous().invalidate(key); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java index 38e7a88a0aa45..3244d8bd747aa 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -76,16 +76,27 @@ public Metrics getZkLatencyMetrics(MetaStoreImplZookeeper metaStore) { dimensionMap.put("metric", "zk_op_stats"); Metrics dMetrics = Metrics.create(dimensionMap); - DimensionStats zkOpLatencyStats = metaStore.getZkOpLatencyStats(); - zkOpLatencyStats.updateStats(); - - dMetrics.put("zk_latency_mean_ms", zkOpLatencyStats.meanDimensionMs); - dMetrics.put("zk_latency_time_median_ms", zkOpLatencyStats.medianDimensionMs); - dMetrics.put("zk_latency_95percentile_ms", zkOpLatencyStats.dimension95Ms); - dMetrics.put("zk_latency_99_percentile_ms", zkOpLatencyStats.dimension99Ms); - dMetrics.put("zk_latency_99_9_percentile_ms", zkOpLatencyStats.dimension999Ms); - dMetrics.put("zk_latency_99_99_percentile_ms", zkOpLatencyStats.dimension999Ms); - dMetrics.put("zk_op_count", zkOpLatencyStats.dimensionCounts); + DimensionStats zkWriteLatencyStats = metaStore.getZkWriteLatencyStats(); + DimensionStats zkReadLatencyStats = metaStore.getZkReadLatencyStats(); + zkWriteLatencyStats.updateStats(); + zkReadLatencyStats.updateStats(); + + dMetrics.put("zk_latency_write_mean_ms", zkWriteLatencyStats.meanDimensionMs); + dMetrics.put("zk_latency_write_time_median_ms", zkWriteLatencyStats.medianDimensionMs); + dMetrics.put("zk_latency_write_95percentile_ms", zkWriteLatencyStats.dimension95Ms); + dMetrics.put("zk_latency_write_99_percentile_ms", zkWriteLatencyStats.dimension99Ms); + dMetrics.put("zk_latency_write_99_9_percentile_ms", zkWriteLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_write_99_99_percentile_ms", zkWriteLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_write_count", zkWriteLatencyStats.dimensionCounts); + + dMetrics.put("zk_latency_read_mean_ms", zkReadLatencyStats.meanDimensionMs); + dMetrics.put("zk_latency_read_time_median_ms", zkReadLatencyStats.medianDimensionMs); + dMetrics.put("zk_latency_read_95percentile_ms", zkReadLatencyStats.dimension95Ms); + dMetrics.put("zk_latency_read_99_percentile_ms", zkReadLatencyStats.dimension99Ms); + dMetrics.put("zk_latency_read_99_9_percentile_ms", zkReadLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_read_99_99_percentile_ms", zkReadLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_read_count", zkReadLatencyStats.dimensionCounts); + dMetrics.put("zk_write_rate", metaStore.getAndResetNumOfWrite()); dMetrics.put("zk_read_rate", metaStore.getAndResetNumOfRead()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java index f146c1a6f47dd..52504f4d8d03e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java @@ -28,8 +28,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -59,6 +60,8 @@ public class OwnershipCacheTest { private NamespaceBundleFactory bundleFactory; private NamespaceService nsService; private BrokerService brokerService; + private ManagedLedgerFactoryImpl ledgerFactory; + private MetaStoreImplZookeeper metaStore; private OrderedSafeExecutor executor; @BeforeMethod @@ -67,12 +70,14 @@ public void setup() throws Exception { selfBrokerUrl = "tcp://localhost:" + port; pulsar = mock(PulsarService.class); config = mock(ServiceConfiguration.class); + ledgerFactory = mock(ManagedLedgerFactoryImpl.class); executor = new OrderedSafeExecutor(1, "test"); zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = new LocalZooKeeperCacheService(zkCache, null); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); + metaStore = new MetaStoreImplZookeeper(zkCache.getZooKeeper(), executor); doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(anyObject()); doReturn(zkCache).when(pulsar).getLocalZkCache(); @@ -83,6 +88,8 @@ public void setup() throws Exception { doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(webAddress(config)).when(pulsar).getWebServiceAddress(); doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl(); + doReturn(ledgerFactory).when(pulsar).getManagedLedgerFactory(); + doReturn(metaStore).when(ledgerFactory).getMetaStore(); } @AfterMethod diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index 059cc167020fb..8c07c8dce930d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -302,7 +302,7 @@ public void testBrokerStatsMetrics() throws Exception { consumer.close(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); JsonArray metrics = brokerStatsClient.getMetrics(); - assertEquals(metrics.size(), 4, metrics.toString()); + assertEquals(metrics.size(), 5, metrics.toString()); // these metrics seem to be arriving in different order at different times... // is the order really relevant here? diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java index f107a548b1bac..5e03ee189b889 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -93,7 +93,8 @@ public void testZkOpStatsMetrics() throws Exception { pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1"); Metrics zkOpMetric = getMetric("zk_op_stats"); Assert.assertNotNull(zkOpMetric); - Assert.assertTrue((double) zkOpMetric.getMetrics().get("zk_latenc_99_99_percentile_ms") > 0); + Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_write_99_99_percentile_ms")); + Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_read_99_99_percentile_ms")); Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_read_rate") > 0); Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_write_rate") > 0);