Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public interface BookKeeperServerStats {
String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";

String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
String ADD_ENTRY_BLOCKED_WAIT = "ADD_ENTRY_BLOCKED_WAIT";
String READ_ENTRY_IN_PROGRESS = "READ_ENTRY_IN_PROGRESS";
String READ_ENTRY_BLOCKED = "READ_ENTRY_BLOCKED";
String READ_ENTRY_BLOCKED_WAIT = "READ_ENTRY_BLOCKED_WAIT";

//
// Journal Stats (scoped under SERVER_SCOPE)
//
Expand Down Expand Up @@ -137,6 +144,7 @@ public interface BookKeeperServerStats {
String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
String SKIP_LIST_THROTTLING_LATENCY = "SKIP_LIST_THROTTLING_LATENCY";
String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING_LATENCY;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -103,6 +105,7 @@ public boolean equals(Object o) {
final AtomicLong size;

final long skipListSizeLimit;
final Semaphore skipListSemaphore;

SkipListArena allocator;

Expand All @@ -119,6 +122,7 @@ private EntrySkipList newSkipList() {
private final OpStatsLogger getEntryStats;
final Counter flushBytesCounter;
private final Counter throttlingCounter;
private final OpStatsLogger throttlingStats;

/**
* Constructor.
Expand All @@ -136,12 +140,22 @@ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource sour
// skip list size limit
this.skipListSizeLimit = conf.getSkipListSizeLimit();

if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) {
// gives 2*1023MB for mem table.
// consider a way to create semaphore with long num of permits
// until that 1023MB should be enough for everything (tm)
throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
}
// double the size for snapshot in progress + incoming data
this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2);

// Stats
this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
this.throttlingStats = statsLogger.getOpStatsLogger(SKIP_LIST_THROTTLING_LATENCY);
}

void dump() {
Expand Down Expand Up @@ -264,6 +278,7 @@ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws
}
}

skipListSemaphore.release((int) size);
return size;
}

Expand All @@ -285,18 +300,6 @@ void clearSnapshot(final EntrySkipList keyValues) {
}
}

/**
* Throttling writer w/ 1 ms delay.
*/
private void throttleWriters() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
throttlingCounter.inc();
}

/**
* Write an update.
*
Expand All @@ -314,11 +317,18 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final
Checkpoint cp = snapshot();
if ((null != cp) || (!previousFlushSucceeded.get())) {
cb.onSizeLimitReached(cp);
} else {
throttleWriters();
}
}

final int len = entry.remaining();
if (!skipListSemaphore.tryAcquire(len)) {
throttlingCounter.inc();
final long throttlingStartTimeNanos = MathUtils.nowInNano();
skipListSemaphore.acquireUninterruptibly(len);
throttlingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos),
TimeUnit.NANOSECONDS);
}

this.lock.readLock().lock();
try {
EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void safeRun() {
}
}
}
skipListSemaphore.release(flushedSize.intValue());
return flushedSize.longValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -76,6 +77,19 @@ private interface JournalIdFilter {
boolean accept(long journalId);
}

/**
* For testability.
*/
@FunctionalInterface
public interface BufferedChannelBuilder {
BufferedChannelBuilder DEFAULT_BCBUILDER =
(FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);

BufferedChannel create(FileChannel fc, int capacity) throws IOException;
}



/**
* List all journal ids by a specified journal id filer.
*
Expand Down Expand Up @@ -935,11 +949,14 @@ public void run() {
while (true) {
// new journal file to write
if (null == logFile) {

logId = logId + 1;

journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite);
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder());

journalCreationStats.registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);

Expand Down Expand Up @@ -1121,6 +1138,10 @@ public void run() {
LOG.info("Journal exited loop!");
}

public BufferedChannelBuilder getBufferedChannelBuilder() {
return BufferedChannelBuilder.DEFAULT_BCBUILDER;
}

/**
* Shuts down the journal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,29 @@ class JournalChannel implements Closeable {
// Open journal for scanning starting from given position.
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, long position) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
}

// Open journal to write
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
}

JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
boolean fRemoveFromPageCache, int formatVersionToWrite,
Journal.BufferedChannelBuilder bcBuilder) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
}

/**
* Create a journal file.
* Allows injection of BufferedChannelBuilder for testing purposes.
*
* @param journalDirectory
* directory to store the journal file.
Expand All @@ -128,7 +138,7 @@ class JournalChannel implements Closeable {
private JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
long position, boolean fRemoveFromPageCache,
int formatVersionToWrite) throws IOException {
int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
Expand Down Expand Up @@ -160,7 +170,7 @@ private JournalChannel(File journalDirectory, long logId,
bb.clear();
fc.write(bb);

bc = new BufferedChannel(fc, writeBufferSize);
bc = bcBuilder.create(fc, writeBufferSize);
forceWrite(true);
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,10 @@ interface LedgerDeletionListener {
void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;

ByteBuf getExplicitLac(long ledgerId);

// for testability
default LedgerStorage getUnderlyingLedgerStorage() {
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.apache.bookkeeper.bookie;

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;

/**
* Strictly for testing.
* Have to be alongside with prod code for Journal to inject in tests.
*/
public class SlowBufferedChannel extends BufferedChannel {
public volatile long getDelay = 0;
public volatile long addDelay = 0;
public volatile long flushDelay = 0;

public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
super(fc, capacity);
}

public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
super(fc, writeCapacity, readCapacity);
}

public void setAddDelay(long delay) {
addDelay = delay;
}

public void setGetDelay(long delay) {
getDelay = delay;
}

public void setFlushDelay(long delay) {
flushDelay = delay;
}

@Override
public synchronized void write(ByteBuf src) throws IOException {
delayMs(addDelay);
super.write(src);
}

@Override
public void flush() throws IOException {
delayMs(flushDelay);
super.flush();
}

@Override
public long forceWrite(boolean forceMetadata) throws IOException {
delayMs(flushDelay);
return super.forceWrite(forceMetadata);
}

@Override
public synchronized int read(ByteBuf dest, long pos) throws IOException {
delayMs(getDelay);
return super.read(dest, pos);
}

private static void delayMs(long delay) {
if (delay < 1) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(delay);
} catch (InterruptedException e) {
//noop
}
}
}
Loading