Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>


<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
final long now = System.nanoTime();
bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
ledger.getStore().recordReadLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
Expand Down Expand Up @@ -1825,6 +1827,7 @@ void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
ledger.getStore().recordWriteCount(3L); // create-ledger-performs-3-write
ledger.getExecutor().submit(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}] Opening legder {}", name, id);
}
store.recordReadCount(1L);
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null);
} else {
Expand Down Expand Up @@ -276,7 +277,9 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else {
iterator.remove();
final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
store.recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc);
}
Expand All @@ -302,6 +305,7 @@ public void operationFailed(MetaStoreException e) {
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
store.recordWriteCount(3L);
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
Expand Down Expand Up @@ -478,6 +482,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
store.recordWriteCount(3); // create-ledger performs 3 writes on zk
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx);
}
Expand Down Expand Up @@ -1076,8 +1081,10 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
mbean.endDataLedgerDeleteOp();
store.recordWriteLatency(System.nanoTime() - now, 1L);
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc);
}, null);
}
Expand All @@ -1092,6 +1099,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
store.recordWriteCount(3); // create-ledger performs 3 writes on zk
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null);
}
Expand Down Expand Up @@ -1158,8 +1166,10 @@ CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId);
}
mbean.startDataLedgerOpenOp();
final long now = System.nanoTime();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
store.recordReadLatency(System.nanoTime() - now, 1L);
executor.submit(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
Expand Down Expand Up @@ -1447,7 +1457,9 @@ public void operationComplete(Void result, Stat stat) {

for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> {
store.recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == BKException.Code.NoSuchLedgerExistsException) {
log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId());
} else if (rc != BKException.Code.OK) {
Expand Down Expand Up @@ -1563,7 +1575,9 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleting ledger {}", name, ls);
}
final long now = System.nanoTime();
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
store.recordWriteLatency(System.nanoTime() - now, 1L);
switch (rc) {
case BKException.Code.NoSuchLedgerExistsException:
log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,30 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @throws MetaStoreException
*/
Iterable<String> getManagedLedgers() throws MetaStoreException;

/**
* Record write zk write with latency (in nano-seconds) for zk-op stats
*
* @param latency
*/
void recordWriteLatency(long latencyInNs, long count);

/**
* Record write zk operation for zk-op stats
*
*/
void recordWriteCount(long count);

/**
* Record read zk read with latency (in nano-seconds) for zk-op stats
*
* @param latency
*/
void recordReadLatency(long latency, long count);

/**
* Record read zk operation for zk-op stats
*
*/
void recordReadCount(long count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.util.DimensionStats;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback.StringCallback;
Expand Down Expand Up @@ -57,6 +60,10 @@ public static enum ZNodeProtobufFormat {
private final ZooKeeper zk;
private final ZNodeProtobufFormat protobufFormat;
private final OrderedSafeExecutor executor;
private final LongAdder numWrite;
private final LongAdder numRead;
private final DimensionStats zkWriteLatencyStats;
private final DimensionStats zkReadLatencyStats;

private static class ZKStat implements Stat {
private final int version;
Expand Down Expand Up @@ -100,6 +107,10 @@ public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat,
this.zk = zk;
this.protobufFormat = protobufFormat;
this.executor = executor;
this.numWrite = new LongAdder();
this.numRead = new LongAdder();
this.zkWriteLatencyStats = new DimensionStats();
this.zkReadLatencyStats = new DimensionStats();

if (zk.exists(prefixName, false) == null) {
zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT);
Expand Down Expand Up @@ -133,7 +144,9 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
@Override
public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
final long now = System.nanoTime();
zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> {
recordReadLatency(System.nanoTime() - now, 1L);
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
Expand Down Expand Up @@ -175,8 +188,10 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St
mlInfo.toString().getBytes(Encoding) : // Text format
mlInfo.toByteArray(); // Binary format

final long now = System.nanoTime();
zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
(rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion() : "null");
Expand All @@ -200,7 +215,9 @@ public void getCursors(final String ledgerName, final MetaStoreCallback<List<Str
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursors list", ledgerName);
}
final long now = System.nanoTime();
zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children, stat) -> executor.submit(safeRun(() -> {
recordReadLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children);
}
Expand All @@ -223,8 +240,9 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName,
if (log.isDebugEnabled()) {
log.debug("Reading from {}", path);
}

final long now = System.nanoTime();
zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> {
recordReadLatency(System.nanoTime() - now, 1L);
if (rc != Code.OK.intValue()) {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
Expand Down Expand Up @@ -257,8 +275,10 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
final long now = System.nanoTime();
zk.create(path, content, Acl, CreateMode.PERSISTENT,
(rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
recordWriteLatency(System.nanoTime() - now, 1L);
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
Expand All @@ -276,7 +296,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
final long now = System.nanoTime();
zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> {
recordWriteLatency(System.nanoTime() - now, 1L);
if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) {
Expand All @@ -292,7 +314,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
public void asyncRemoveCursor(final String ledgerName, final String consumerName,
final MetaStoreCallback<Void> callback) {
log.info("[{}] Remove consumer={}", ledgerName, consumerName);
final long now = System.nanoTime();
zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> {
recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc));
}
Expand All @@ -307,7 +331,9 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
final long now = System.nanoTime();
zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> {
recordWriteLatency(System.nanoTime() - now, 1L);
if (log.isDebugEnabled()) {
log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc));
}
Expand Down Expand Up @@ -386,5 +412,43 @@ private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws I
return ManagedCursorInfo.newBuilder().mergeFrom(data).build();
}

@Override
public void recordWriteLatency(long latencyInNs, long count) {
zkWriteLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count);
numWrite.add(count);
}

@Override
public void recordWriteCount(long count) {
numWrite.add(count);
}

@Override
public void recordReadLatency(long latencyInNs, long count) {
zkReadLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count);
numRead.add(count);
}

@Override
public void recordReadCount(long count) {
numRead.add(count);
}

public long getAndResetNumOfWrite() {
return numWrite.sumThenReset();
}

public long getAndResetNumOfRead() {
return numRead.sumThenReset();
}

public DimensionStats getZkWriteLatencyStats() {
return this.zkWriteLatencyStats;
}

public DimensionStats getZkReadLatencyStats() {
return this.zkReadLatencyStats;
}

private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.util;

import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;

import java.util.concurrent.TimeUnit;

/**
*/
public class DimensionStats {

/** Statistics for given dimension **/
public double meanDimensionMs;

public double medianDimensionMs;

public double dimension95Ms;

public double dimension99Ms;

public double dimension999Ms;

public double dimension9999Ms;

public double dimensionCounts;

public double elapsedIntervalMs;

private final long maxTrackableSeconds = 120;
private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(maxTrackableSeconds), 2);
private Histogram dimensionHistogram = null;

public void updateStats() {

dimensionHistogram = dimensionTimeRecorder.getIntervalHistogram(dimensionHistogram);

this.meanDimensionMs = dimensionHistogram.getMean();
this.medianDimensionMs = dimensionHistogram.getValueAtPercentile(50);
this.dimension95Ms = dimensionHistogram.getValueAtPercentile(95);
this.dimension99Ms = dimensionHistogram.getValueAtPercentile(99);
this.dimension999Ms = dimensionHistogram.getValueAtPercentile(99.9);
this.dimension9999Ms = dimensionHistogram.getValueAtPercentile(99.99);
this.dimensionCounts = dimensionHistogram.getTotalCount();
}

public void recordValue(long dimensionLatencyMs) {
dimensionLatencyMs = dimensionLatencyMs > maxTrackableSeconds ? maxTrackableSeconds : dimensionLatencyMs;
dimensionTimeRecorder.recordValue(dimensionLatencyMs);
}
}
Loading