From b76eb3a5bd557f7e0a43a33734c642f59e9b5e06 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 11 May 2018 16:11:31 -0700 Subject: [PATCH 1/7] (@bug W-3651831@) backpressure: server-side backpressure --- .../bookie/BookKeeperServerStats.java | 8 + .../org/apache/bookkeeper/bookie/Bookie.java | 4 + .../bookkeeper/bookie/EntryMemTable.java | 38 +- .../EntryMemTableWithParallelFlusher.java | 1 + .../org/apache/bookkeeper/bookie/Journal.java | 23 +- .../bookkeeper/bookie/JournalChannel.java | 25 +- .../bookkeeper/bookie/LedgerStorage.java | 6 + .../bookie/SlowBufferedChannel.java | 93 ++++ .../bookie/SortedLedgerStorage.java | 125 +++++- .../bookkeeper/conf/ServerConfiguration.java | 50 +++ .../processor/RequestProcessor.java | 1 - .../proto/BookieRequestProcessor.java | 139 ++++++ .../apache/bookkeeper/proto/BookieServer.java | 11 + .../proto/GetBookieInfoProcessorV3.java | 10 +- .../proto/ReadEntryProcessorV3.java | 3 + .../proto/WriteEntryProcessorV3.java | 8 + .../bookkeeper/bookie/EntryLogTest.java | 5 +- .../bookkeeper/bookie/LedgerCacheTest.java | 9 +- .../bookie/SlowInterleavedLedgerStorage.java | 140 ++++++ .../bookie/SlowSortedLedgerStorage.java | 36 ++ .../SortedLedgerStorageCheckpointTest.java | 6 +- .../proto/BookieBackpressureTest.java | 410 ++++++++++++++++++ .../proto/TestBookieRequestProcessor.java | 8 +- .../resources/bookkeeper/findbugsExclude.xml | 6 + 24 files changed, 1109 insertions(+), 56 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index d488bc96e1a..19db7e9e746 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -77,6 +77,13 @@ public interface BookKeeperServerStats { String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES"; String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES"; + String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS"; + String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED"; + String ADD_ENTRY_BLOCKED_WAIT = "ADD_ENTRY_BLOCKED_WAIT"; + String READ_ENTRY_IN_PROGRESS = "READ_ENTRY_IN_PROGRESS"; + String READ_ENTRY_BLOCKED = "READ_ENTRY_BLOCKED"; + String READ_ENTRY_BLOCKED_WAIT = "READ_ENTRY_BLOCKED_WAIT"; + // // Journal Stats (scoped under SERVER_SCOPE) // @@ -137,6 +144,7 @@ public interface BookKeeperServerStats { String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT"; String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES"; String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING"; + String SKIP_LIST_THROTTLING_LATENCY = "SKIP_LIST_THROTTLING_LATENCY"; String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR"; String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS"; String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 2b893876e5e..c017a476bb2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -106,6 +106,10 @@ public class Bookie extends BookieCriticalThread { private static final Logger LOG = LoggerFactory.getLogger(Bookie.class); + public static final String PROP_SLOW_JOURNAL_FLUSH_DELAY = "test.slowJournal.flushDelay"; + public static final String PROP_SLOW_JOURNAL_ADD_DELAY = "test.slowJournal.addDelay"; + public static final String PROP_SLOW_JOURNAL_GET_DELAY = "test.slowJournal.getDelay"; + final List journalDirectories; final ServerConfiguration conf; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java index 70f437cb317..73283989bb0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java @@ -24,10 +24,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING_LATENCY; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -103,6 +105,7 @@ public boolean equals(Object o) { final AtomicLong size; final long skipListSizeLimit; + final Semaphore skipListSemaphore; SkipListArena allocator; @@ -119,6 +122,7 @@ private EntrySkipList newSkipList() { private final OpStatsLogger getEntryStats; final Counter flushBytesCounter; private final Counter throttlingCounter; + private final OpStatsLogger throttlingStats; /** * Constructor. @@ -136,12 +140,22 @@ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource sour // skip list size limit this.skipListSizeLimit = conf.getSkipListSizeLimit(); + if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) { + // gives 2*1023MB for mem table. + // consider a way to create semaphore with long num of permits + // until that 1023MB should be enough for everything (tm) + throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2)); + } + // double the size for snapshot in progress + incoming data + this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2); + // Stats this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT); this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY); this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY); this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES); this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING); + this.throttlingStats = statsLogger.getOpStatsLogger(SKIP_LIST_THROTTLING_LATENCY); } void dump() { @@ -264,6 +278,7 @@ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws } } + skipListSemaphore.release((int) size); return size; } @@ -285,18 +300,6 @@ void clearSnapshot(final EntrySkipList keyValues) { } } - /** - * Throttling writer w/ 1 ms delay. - */ - private void throttleWriters() { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - throttlingCounter.inc(); - } - /** * Write an update. * @@ -314,11 +317,18 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final Checkpoint cp = snapshot(); if ((null != cp) || (!previousFlushSucceeded.get())) { cb.onSizeLimitReached(cp); - } else { - throttleWriters(); } } + final int len = entry.remaining(); + if (!skipListSemaphore.tryAcquire(len)) { + throttlingCounter.inc(); + final long throttlingStartTimeNanos = MathUtils.nowInNano(); + skipListSemaphore.acquireUninterruptibly(len); + throttlingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), + TimeUnit.NANOSECONDS); + } + this.lock.readLock().lock(); try { EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java index a3849e915e1..4f2cf022923 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java @@ -144,6 +144,7 @@ public void safeRun() { } } } + skipListSemaphore.release(flushedSize.intValue()); return flushedSize.longValue(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 71477c02748..aba4a6d3a7a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -36,6 +36,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -907,6 +908,10 @@ public int getJournalQueueLength() { public void run() { LOG.info("Starting journal on {}", journalDirectory); + long getDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_GET_DELAY, 0); + long addDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_ADD_DELAY, 0); + long flushDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, 0); + RecyclableArrayList toFlush = entryListRecycler.newInstance(); int numEntriesToFlush = 0; ByteBuf lenBuff = Unpooled.buffer(4); @@ -935,11 +940,27 @@ public void run() { while (true) { // new journal file to write if (null == logFile) { + logId = logId + 1; + final JournalChannel.BufferedChannelBuilder bcBuilder; + if (getDelay > 0 || addDelay > 0 || flushDelay > 0) { + LOG.warn("delay of journal writes enabled, please make sure it is only used in test env."); + bcBuilder = (FileChannel fc, int capacity) -> { + SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity); + sbc.setAddDelay(addDelay); + sbc.setGetDelay(getDelay); + sbc.setFlushDelay(flushDelay); + return sbc; + }; + } else { + bcBuilder = JournalChannel.DEFAULT_BCBUILDER; + } journalCreationWatcher.reset().start(); logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize, - journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite); + journalAlignmentSize, removePagesFromCache, + journalFormatVersionToWrite, bcBuilder); + journalCreationStats.registerSuccessfulEvent( journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index 6d5cae05081..6246117f3b1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -42,6 +42,13 @@ class JournalChannel implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(JournalChannel.class); + @FunctionalInterface + public interface BufferedChannelBuilder { + BufferedChannel create(FileChannel fc, int capacity) throws IOException; + } + public static final BufferedChannelBuilder DEFAULT_BCBUILDER = + (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity); + final RandomAccessFile randomAccessFile; final int fd; final FileChannel fc; @@ -93,7 +100,8 @@ class JournalChannel implements Closeable { // Open journal for scanning starting from given position. JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, long position) throws IOException { - this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5); + this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, + position, false, V5, DEFAULT_BCBUILDER); } // Open journal to write @@ -101,11 +109,20 @@ class JournalChannel implements Closeable { long preAllocSize, int writeBufferSize, int journalAlignSize, boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, - START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite); + fRemoveFromPageCache, formatVersionToWrite, DEFAULT_BCBUILDER); + } + + JournalChannel(File journalDirectory, long logId, + long preAllocSize, int writeBufferSize, int journalAlignSize, + boolean fRemoveFromPageCache, int formatVersionToWrite, + BufferedChannelBuilder bcBuilder) throws IOException { + this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, + START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder); } /** * Create a journal file. + * Allows injection of BufferedChannelBuilder for testing purposes. * * @param journalDirectory * directory to store the journal file. @@ -128,7 +145,7 @@ class JournalChannel implements Closeable { private JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, int journalAlignSize, long position, boolean fRemoveFromPageCache, - int formatVersionToWrite) throws IOException { + int formatVersionToWrite, BufferedChannelBuilder bcBuilder) throws IOException { this.journalAlignSize = journalAlignSize; this.zeros = ByteBuffer.allocate(journalAlignSize); this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize; @@ -160,7 +177,7 @@ private JournalChannel(File journalDirectory, long logId, bb.clear(); fc.write(bb); - bc = new BufferedChannel(fc, writeBufferSize); + bc = bcBuilder.create(fc, writeBufferSize); forceWrite(true); nextPrealloc = this.preAllocSize; fc.write(zeros, nextPrealloc - journalAlignSize); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index dcfac3147ea..34e32b9499d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -169,4 +169,10 @@ interface LedgerDeletionListener { void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException; ByteBuf getExplicitLac(long ledgerId); + + // for testability + default LedgerStorage getUnderlyingLedgerStorage() { + return this; + } + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java new file mode 100644 index 00000000000..9fdc34ca7d6 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java @@ -0,0 +1,93 @@ +package org.apache.bookkeeper.bookie; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.concurrent.TimeUnit; + +/** + * Strictly for testing. + * Have to be alongside with prod code for Journal to inject in tests. + */ +public class SlowBufferedChannel extends BufferedChannel { + public volatile long getDelay = 0; + public volatile long addDelay = 0; + public volatile long flushDelay = 0; + + public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException { + super(fc, capacity); + } + + public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException { + super(fc, writeCapacity, readCapacity); + } + + public void setAddDelay(long delay) { + addDelay = delay; + } + + public void setGetDelay(long delay) { + getDelay = delay; + } + + public void setFlushDelay(long delay) { + flushDelay = delay; + } + + @Override + public synchronized void write(ByteBuf src) throws IOException { + delayMs(addDelay); + super.write(src); + } + + @Override + public void flush() throws IOException { + delayMs(flushDelay); + super.flush(); + } + + @Override + public long forceWrite(boolean forceMetadata) throws IOException { + delayMs(flushDelay); + return super.forceWrite(forceMetadata); + } + + @Override + public synchronized int read(ByteBuf dest, long pos) throws IOException { + delayMs(getDelay); + return super.read(dest, pos); + } + + private static void delayMs(long delay) { + if (delay < 1) { + return; + } + try { + TimeUnit.MILLISECONDS.sleep(delay); + } catch (InterruptedException e) { + //noop + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 815c65e18e7..5c4f75a22e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; @@ -41,16 +42,23 @@ * entries will be first added into a {@code MemTable}, and then be flushed back to the * {@code InterleavedLedgerStorage} when the {@code MemTable} becomes full. */ -public class SortedLedgerStorage extends InterleavedLedgerStorage - implements LedgerStorage, CacheCallback, SkipListFlusher { +public class SortedLedgerStorage + implements LedgerStorage, CacheCallback, SkipListFlusher, + CompactableLedgerStorage, EntryLogger.EntryLogListener { private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class); EntryMemTable memTable; private ScheduledExecutorService scheduler; private StateManager stateManager; + private final InterleavedLedgerStorage interleavedLedgerStorage; public SortedLedgerStorage() { - super(); + this(new InterleavedLedgerStorage()); + } + + @VisibleForTesting + protected SortedLedgerStorage(InterleavedLedgerStorage ils) { + interleavedLedgerStorage = ils; } @Override @@ -63,7 +71,8 @@ public void initialize(ServerConfiguration conf, Checkpointer checkpointer, StatsLogger statsLogger) throws IOException { - super.initialize( + + interleavedLedgerStorage.initialize( conf, ledgerManager, ledgerDirsManager, @@ -72,6 +81,7 @@ public void initialize(ServerConfiguration conf, checkpointSource, checkpointer, statsLogger); + if (conf.isEntryLogPerLedgerEnabled()) { this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger); } else { @@ -96,7 +106,7 @@ public void start() { } catch (IOException e) { LOG.error("Exception thrown while flushing ledger cache.", e); } - super.start(); + interleavedLedgerStorage.start(); } @Override @@ -111,22 +121,42 @@ public void shutdown() throws InterruptedException { } catch (Exception e) { LOG.error("Error while closing the memtable", e); } - super.shutdown(); + interleavedLedgerStorage.shutdown(); } @Override public boolean ledgerExists(long ledgerId) throws IOException { // Done this way because checking the skip list is an O(logN) operation compared to // the O(1) for the ledgerCache. - if (!super.ledgerExists(ledgerId)) { + if (!interleavedLedgerStorage.ledgerExists(ledgerId)) { EntryKeyValue kv = memTable.getLastEntry(ledgerId); if (null == kv) { - return super.ledgerExists(ledgerId); + return interleavedLedgerStorage.ledgerExists(ledgerId); } } return true; } + @Override + public boolean setFenced(long ledgerId) throws IOException { + return interleavedLedgerStorage.setFenced(ledgerId); + } + + @Override + public boolean isFenced(long ledgerId) throws IOException { + return interleavedLedgerStorage.isFenced(ledgerId); + } + + @Override + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + interleavedLedgerStorage.setMasterKey(ledgerId, masterKey); + } + + @Override + public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { + return interleavedLedgerStorage.readMasterKey(ledgerId); + } + @Override public long addEntry(ByteBuf entry) throws IOException { long ledgerId = entry.getLong(entry.readerIndex() + 0); @@ -134,7 +164,7 @@ public long addEntry(ByteBuf entry) throws IOException { long lac = entry.getLong(entry.readerIndex() + 16); memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this); - ledgerCache.updateLastAddConfirmed(ledgerId, lac); + interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac); return entryId; } @@ -149,7 +179,7 @@ private ByteBuf getLastEntryId(long ledgerId) throws IOException { return kv.getValueAsByteBuffer(); } // If it doesn't exist in the skip list, then fallback to the ledger cache+index. - return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); + return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); } @Override @@ -159,13 +189,13 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { } ByteBuf buffToRet; try { - buffToRet = super.getEntry(ledgerId, entryId); + buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId); } catch (Bookie.NoEntryException nee) { EntryKeyValue kv = memTable.getEntry(ledgerId, entryId); if (null == kv) { // The entry might have been flushed since we last checked, so query the ledger cache again. // If the entry truly doesn't exist, then this will throw a NoEntryException - buffToRet = super.getEntry(ledgerId, entryId); + buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId); } else { buffToRet = kv.getValueAsByteBuffer(); } @@ -174,23 +204,56 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { return buffToRet; } + @Override + public long getLastAddConfirmed(long ledgerId) throws IOException { + return interleavedLedgerStorage.getLastAddConfirmed(ledgerId); + } + + @Override + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher watcher) + throws IOException { + return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); + } + @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { long numBytesFlushed = memTable.flush(this, checkpoint); - entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed); - super.checkpoint(checkpoint); + interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed); + interleavedLedgerStorage.checkpoint(checkpoint); + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + interleavedLedgerStorage.deleteLedger(ledgerId); + } + + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + interleavedLedgerStorage.registerLedgerDeletionListener(listener); + } + + @Override + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { + interleavedLedgerStorage.setExplicitlac(ledgerId, lac); + } + + @Override + public ByteBuf getExplicitLac(long ledgerId) { + return interleavedLedgerStorage.getExplicitLac(ledgerId); } @Override public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException { - processEntry(ledgerId, entryId, buffer, false); + interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false); } @Override public void flush() throws IOException { memTable.flush(this, Checkpoint.MAX); - super.flush(); + interleavedLedgerStorage.flush(); } // CacheCallback functions. @@ -212,10 +275,10 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); - entryLogger.prepareEntryMemTableFlush(); + interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush(); memTable.flush(SortedLedgerStorage.this); - if (entryLogger.commitEntryMemTableFlush()) { - checkpointer.startCheckpoint(cp); + if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) { + interleavedLedgerStorage.checkpointer.startCheckpoint(cp); } } catch (Exception e) { stateManager.transitionToReadOnlyMode(); @@ -237,4 +300,28 @@ BookieStateManager getStateManager(){ return (BookieStateManager) stateManager; } + @Override + public EntryLogger getEntryLogger() { + return interleavedLedgerStorage.getEntryLogger(); + } + + @Override + public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException { + return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId); + } + + @Override + public void updateEntriesLocations(Iterable locations) throws IOException { + interleavedLedgerStorage.updateEntriesLocations(locations); + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + interleavedLedgerStorage.flushEntriesLocationsIndex(); + } + + @Override + public LedgerStorage getUnderlyingLedgerStorage() { + return interleavedLedgerStorage; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index d51e93da3a1..fb1b6355e1a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -81,6 +81,9 @@ public class ServerConfiguration extends AbstractConfiguration (Integer.MAX_VALUE - 1) / 2) { + // gives max of 2*1023MB for mem table (one being checkpointed and still writable). + throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2)); + } setProperty(SKIP_LIST_SIZE_LIMIT, size); return this; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 2c8cf7af36d..8b328ef7db0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -41,5 +41,4 @@ public interface RequestProcessor extends AutoCloseable { * channel received the given request r */ void processRequest(Object r, Channel channel); - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 2aebbb95535..078d6560d86 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -22,6 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER; @@ -29,9 +32,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST; @@ -45,6 +51,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST; import static org.apache.bookkeeper.proto.RequestUtils.hasFlag; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -56,17 +63,21 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.AccessLevel; import lombok.Getter; import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.AuthToken; import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.tls.SecurityException; @@ -150,6 +161,18 @@ public class BookieRequestProcessor implements RequestProcessor { final OpStatsLogger getBookieInfoRequestStats; final OpStatsLogger getBookieInfoStats; final OpStatsLogger channelWriteStats; + final OpStatsLogger addEntryBlockedStats; + final OpStatsLogger readEntryBlockedStats; + + final AtomicInteger addsInProgress = new AtomicInteger(0); + final AtomicInteger maxAddsInProgress = new AtomicInteger(0); + final AtomicInteger addsBlocked = new AtomicInteger(0); + final AtomicInteger readsInProgress = new AtomicInteger(0); + final AtomicInteger readsBlocked = new AtomicInteger(0); + final AtomicInteger maxReadsInProgress = new AtomicInteger(0); + + final Semaphore addsSemaphore; + final Semaphore readsSemaphore; public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException { @@ -214,6 +237,122 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO); this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST); this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE); + + this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT); + this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT); + + int maxAdds = serverCfg.getMaxAddsInProgress(); + addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null; + + int maxReads = serverCfg.getMaxReadsInProgress(); + readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; + + statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return addsInProgress; + } + }); + + statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return addsBlocked; + } + }); + + statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return readsInProgress; + } + }); + + statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return readsBlocked; + } + }); + + } + + protected void onAddRequestStart(Channel channel) { + if (addsSemaphore != null) { + if (!addsSemaphore.tryAcquire()) { + final long throttlingStartTimeNanos = MathUtils.nowInNano(); + channel.config().setAutoRead(false); + addsBlocked.incrementAndGet(); + addsSemaphore.acquireUninterruptibly(); + channel.config().setAutoRead(true); + addEntryBlockedStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), + TimeUnit.NANOSECONDS); + addsBlocked.decrementAndGet(); + } + } + final int curr = addsInProgress.incrementAndGet(); + maxAddsInProgress.accumulateAndGet(curr, Integer::max); + } + + protected void onAddRequestFinish() { + addsInProgress.decrementAndGet(); + if (addsSemaphore != null) { + addsSemaphore.release(); + } + } + + protected void onReadRequestStart(Channel channel) { + if (readsSemaphore != null) { + if (!readsSemaphore.tryAcquire()) { + final long throttlingStartTimeNanos = MathUtils.nowInNano(); + channel.config().setAutoRead(false); + readsBlocked.incrementAndGet(); + readsSemaphore.acquireUninterruptibly(); + channel.config().setAutoRead(true); + readEntryBlockedStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), + TimeUnit.NANOSECONDS); + readsBlocked.decrementAndGet(); + } + } + final int curr = readsInProgress.incrementAndGet(); + maxReadsInProgress.accumulateAndGet(curr, Integer::max); + } + + protected void onReadRequestFinish() { + readsInProgress.decrementAndGet(); + if (readsSemaphore != null) { + readsSemaphore.release(); + } + } + + @VisibleForTesting + int maxAddsInProgressCount() { + return maxAddsInProgress.get(); + } + + @VisibleForTesting + int maxReadsInProgressCount() { + return maxReadsInProgress.get(); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index c0214d338b2..a4685bde913 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -29,6 +29,8 @@ import java.net.UnknownHostException; import java.security.AccessControlException; import java.util.Arrays; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieCriticalThread; import org.apache.bookkeeper.bookie.BookieException; @@ -127,6 +129,10 @@ public void start() throws IOException, UnavailableException, InterruptedExcepti running = true; deathWatcher = new DeathWatcher(conf); deathWatcher.start(); + + // fixes test flappers at random places until ISSUE#1400 is resolved + // https://github.com/apache/bookkeeper/issues/1400 + TimeUnit.MILLISECONDS.sleep(100); } @VisibleForTesting @@ -139,6 +145,11 @@ public Bookie getBookie() { return bookie; } + @VisibleForTesting + public BookieRequestProcessor getBookieRequestProcessor() { + return (BookieRequestProcessor) requestProcessor; + } + /** * Suspend processing of requests in the bookie (for testing). */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java index fc55e4a77d6..d964957488b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -54,7 +54,7 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { if (!isVersionCompatible()) { getBookieInfoResponse.setStatus(StatusCode.EBADVERSION); - requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), + requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); return getBookieInfoResponse.build(); } @@ -66,11 +66,11 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { long freeDiskSpace = 0L, totalDiskSpace = 0L; try { if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) { - freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace(); + freeDiskSpace = requestProcessor.getBookie().getTotalFreeSpace(); getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace); } if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) { - totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace(); + totalDiskSpace = requestProcessor.getBookie().getTotalDiskSpace(); getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace); } LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace); @@ -80,7 +80,7 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { } getBookieInfoResponse.setStatus(status); - requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), + requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); return getBookieInfoResponse.build(); } @@ -98,6 +98,6 @@ private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) { .setGetBookieInfoResponse(getBookieInfoResponse); sendResponse(response.getStatus(), response.build(), - requestProcessor.getBookieInfoRequestStats); + requestProcessor.getGetBookieInfoRequestStats()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index 3d2a1eafea9..12c02016d91 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -65,6 +65,8 @@ public ReadEntryProcessorV3(Request request, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool) { super(request, channel, requestProcessor); + requestProcessor.onReadRequestStart(channel); + this.readRequest = request.getReadRequest(); this.ledgerId = readRequest.getLedgerId(); this.entryId = readRequest.getEntryId(); @@ -313,6 +315,7 @@ protected void sendResponse(ReadResponse readResponse) { sendResponse(response.getStatus(), response.build(), reqStats); + requestProcessor.onReadRequestFinish(); } // diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index fe68da42a19..7747e5c0e1a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { public WriteEntryProcessorV3(Request request, Channel channel, BookieRequestProcessor requestProcessor) { super(request, channel, requestProcessor); + requestProcessor.onAddRequestStart(channel); } // Returns null if there is no exception thrown @@ -173,6 +175,12 @@ public void safeRun() { } } + @Override + protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) { + super.sendResponse(code, response, statsLogger); + requestProcessor.onAddRequestFinish(); + } + /** * this toString method filters out body and masterKey from the output. * masterKey contains the password of the ledger and body is customer data, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java index f5e73a1679d..fbfe7c96264 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java @@ -303,7 +303,8 @@ public void testAddEntryFailureOnDiskFull() throws Exception { Bookie bookie = new Bookie(conf); EntryLogger entryLogger = new EntryLogger(conf, bookie.getLedgerDirsManager()); - InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + InterleavedLedgerStorage ledgerStorage = + ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()); ledgerStorage.entryLogger = entryLogger; // Create ledgers ledgerStorage.setMasterKey(1, "key".getBytes()); @@ -675,7 +676,7 @@ public void testConcurrentWriteAndReadCalls(String ledgerStorageClass, boolean e conf.setLedgerStorageClass(ledgerStorageClass); conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled); Bookie bookie = new Bookie(conf); - InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + CompactableLedgerStorage ledgerStorage = (CompactableLedgerStorage) bookie.ledgerStorage; Random rand = new Random(0); if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index ae3b4cdccab..1a01299a6cd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -89,7 +89,7 @@ public void setUp() throws Exception { bookie = new Bookie(conf); activeLedgers = new SnapshotMap(); - ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache; + ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache; } @After @@ -116,8 +116,8 @@ private void newLedgerCache() throws IOException { if (ledgerCache != null) { ledgerCache.close(); } - ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl( - conf, activeLedgers, bookie.getIndexDirsManager()); + ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()) + .ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager()); flushThread = new Thread() { public void run() { while (true) { @@ -275,7 +275,8 @@ public void testLedgerCacheFlushFailureOnDiskFull() throws Exception { conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() }); Bookie bookie = new Bookie(conf); - InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage); + InterleavedLedgerStorage ledgerStorage = + ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()); LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache; // Create ledger index file ledgerStorage.setMasterKey(1, "key".getBytes()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java new file mode 100644 index 00000000000..c3a8f6359eb --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java @@ -0,0 +1,140 @@ +package org.apache.bookkeeper.bookie; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import io.netty.buffer.ByteBuf; +import java.io.IOException; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Strictly for testing. + * have to be in org.apache.bookkeeper.bookie to not introduce changes to InterleavedLedgerStorage + */ +public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage { + + public static final String PROP_SLOW_STORAGE_FLUSH_DELAY = "test.slowStorage.flushDelay"; + public static final String PROP_SLOW_STORAGE_ADD_DELAY = "test.slowStorage.addDelay"; + public static final String PROP_SLOW_STORAGE_GET_DELAY = "test.slowStorage.getDelay"; + + /** + * Strictly for testing. + */ + public static class SlowEntryLogger extends EntryLogger { + public volatile long getDelay = 0; + public volatile long addDelay = 0; + public volatile long flushDelay = 0; + + public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener) + throws IOException { + super(conf, ledgerDirsManager, listener); + } + + public SlowEntryLogger setAddDelay(long delay) { + addDelay = delay; + return this; + } + + public SlowEntryLogger setGetDelay(long delay) { + getDelay = delay; + return this; + } + + public SlowEntryLogger setFlushDelay(long delay) { + flushDelay = delay; + return this; + } + + @Override + public void flush() throws IOException { + delayMs(flushDelay); + super.flush(); + } + + @Override + public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { + delayMs(addDelay); + return super.addEntry(ledger, entry, rollLog); + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId, long location) + throws IOException, Bookie.NoEntryException { + delayMs(getDelay); + return super.readEntry(ledgerId, entryId, location); + } + + private static void delayMs(long delay) { + if (delay < 1) { + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + //noop + } + } + + } + + public SlowInterleavedLedgerStorage() { + super(); + } + + @Override + public void initialize(ServerConfiguration conf, + LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, + LedgerDirsManager indexDirsManager, + StateManager stateManager, + CheckpointSource checkpointSource, + Checkpointer checkpointer, + StatsLogger statsLogger) + throws IOException { + super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, + stateManager, checkpointSource, checkpointer, statsLogger); + // do not want to add these to config class, reading throw "raw" interface + long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0); + long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0); + long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0); + + entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this) + .setAddDelay(addDelay) + .setGetDelay(getDelay) + .setFlushDelay(flushDelay); + } + + public void setAddDelay(long delay) { + ((SlowEntryLogger) entryLogger).setAddDelay(delay); + } + + public void setGetDelay(long delay) { + ((SlowEntryLogger) entryLogger).setGetDelay(delay); + } + + public void setFlushDelay(long delay) { + ((SlowEntryLogger) entryLogger).setFlushDelay(delay); + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java new file mode 100644 index 00000000000..e12976d04e3 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java @@ -0,0 +1,36 @@ +package org.apache.bookkeeper.bookie; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Strictly for unit testing. + */ +public class SlowSortedLedgerStorage extends SortedLedgerStorage { + + public SlowSortedLedgerStorage() { + this(new SlowInterleavedLedgerStorage()); + } + + SlowSortedLedgerStorage(InterleavedLedgerStorage ils) { + super(ils); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index f3ef1086ca6..322cdd08990 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -225,7 +225,7 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger() .getEntryLogManager(); entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); - long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId(); + long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId(); long currentLogId = entryLogManager.getCurrentLogId(); log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId); @@ -244,8 +244,8 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { assertEquals(0, storage.memTable.kvmap.size()); assertTrue( "current log " + currentLogId + " contains entries added from memtable should be forced to disk" - + " but least unflushed log is " + storage.entryLogger.getLeastUnflushedLogId(), - storage.entryLogger.getLeastUnflushedLogId() > currentLogId); + + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(), + storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java new file mode 100644 index 00000000000..a971ae88032 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java @@ -0,0 +1,410 @@ +package org.apache.bookkeeper.proto; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import static org.junit.Assert.assertTrue; + +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage; +import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; + +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Tests for backpressure handling on the server side. + */ +public class BookieBackpressureTest extends BookKeeperClusterTestCase + implements AddCallback, ReadCallback, ReadLastConfirmedCallback { + + private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class); + + byte[] ledgerPassword = "aaa".getBytes(); + + final byte[] data = new byte[8 * 1024]; + + // test related variables + static final int NUM_ENTRIES_TO_WRITE = 500; + static final int MAX_PENDING = 10; + static final int NUM_OF_LEDGERS = 25; + + DigestType digestType; + + public BookieBackpressureTest() { + super(1); + this.digestType = DigestType.CRC32; + + baseClientConf.setAddEntryTimeout(100); + } + + class SyncObj { + long lastConfirmed; + volatile int counter; + boolean value; + AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + Enumeration ls = null; + + public SyncObj() { + counter = 0; + lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; + value = false; + } + + void setReturnCode(int rc) { + this.rc.compareAndSet(BKException.Code.OK, rc); + } + + void setLedgerEntries(Enumeration ls) { + this.ls = ls; + } + } + + @Test + public void testWriteNoBackpressureSlowJournal() throws Exception { + //disable backpressure for writes + bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_ADD_DELAY, "1"); + + doWritesNoBackpressure(0); + } + + @Test + public void testWriteNoBackpressureSlowJournalFlush() throws Exception { + //disable backpressure for writes + bsConfs.get(0).setMaxAddsInProgress(0); + // to increase frequency of flushes + bsConfs.get(0).setJournalAdaptiveGroupWrites(false); + bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + + doWritesNoBackpressure(0); + } + + @Test + public void testWriteWithBackpressureSlowJournal() throws Exception { + //enable backpressure with MAX_PENDING writes in progress + bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + + doWritesWithBackpressure(0); + } + + + @Test + public void testWriteWithBackpressureSlowJournalFlush() throws Exception { + //enable backpressure with MAX_PENDING writes in progress + bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + // to increase frequency of flushes + bsConfs.get(0).setJournalAdaptiveGroupWrites(false); + bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + + doWritesWithBackpressure(0); + } + + @Test + public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception { + //disable backpressure for writes + bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); + + doWritesNoBackpressure(0); + } + + @Test + public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception { + //enable backpressure with MAX_PENDING writes in progress + bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); + + doWritesWithBackpressure(0); + } + + @Test + public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception { + //disable backpressure for writes + bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); + + doWritesNoBackpressure(0); + } + + @Test + public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception { + //enable backpressure with MAX_PENDING writes in progress + bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); + + doWritesWithBackpressure(0); + } + + @Test + public void testWriteNoBackpressureSortedStorage() throws Exception { + //disable backpressure for writes + bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + final int entriesInMemtable = 3; + // one for memtable being flushed, one for the part accepting the data + assertTrue("for the test, memtable should not keep more entries than allowed", + entriesInMemtable * 2 <= MAX_PENDING); + bsConfs.get(0).setSkipListSizeLimit(data.length * entriesInMemtable); + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); + + doWritesNoBackpressure(0); + } + + @Test + public void testWriteWithBackpressureSortedStorage() throws Exception { + //enable backpressure with MAX_PENDING writes in progress + bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + final int entriesInMemtable = 3; + // one for memtable being flushed, one for the part accepting the data + assertTrue("for the test, memtable should not keep more entries than allowed", + entriesInMemtable * 2 <= MAX_PENDING); + bsConfs.get(0).setSkipListSizeLimit(data.length * entriesInMemtable); + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); + + doWritesWithBackpressure(0); + } + + @Test + public void testReadsNoBackpressure() throws Exception { + //disable backpressure for reads + bsConfs.get(0).setMaxReadsInProgress(0); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1"); + + final BookieRequestProcessor brp = generateDataAndDoReads(0); + + Assert.assertThat("reads in progress should exceed MAX_PENDING", + brp.maxReadsInProgressCount(), Matchers.greaterThan(MAX_PENDING)); + } + + @Test + public void testReadsWithBackpressure() throws Exception { + //enable backpressure for reads + bsConfs.get(0).setMaxReadsInProgress(MAX_PENDING); + bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); + bsConfs.get(0).setWriteBufferBytes(data.length); + + bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1"); + + final BookieRequestProcessor brp = generateDataAndDoReads(0); + + Assert.assertThat("reads in progress should NOT exceed MAX_PENDING ", + brp.maxReadsInProgressCount(), Matchers.lessThanOrEqualTo(MAX_PENDING)); + } + + private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exception { + BookieServer bks = bs.get(bkId); + bks.shutdown(); + bks = new BookieServer(bsConfs.get(bkId)); + bks.start(); + bs.set(bkId, bks); + + // Create ledgers + final int numEntriesForReads = 10; + LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; + for (int i = 0; i < NUM_OF_LEDGERS; i++) { + lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword); + LOG.info("created ledger ID: {}", lhs[i].getId()); + } + + // generate data for reads + final CountDownLatch writesCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS); + for (int i = 0; i < numEntriesForReads; i++) { + for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { + lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> writesCompleteLatch.countDown(), null); + } + } + writesCompleteLatch.await(); + + // issue bunch of async reads + // generate data for reads + final CountDownLatch readsCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS); + for (int i = 0; i < numEntriesForReads; i++) { + for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { + lhs[ledger].asyncReadEntries(i, i, (rc, lh, seq, ctx) -> readsCompleteLatch.countDown(), null); + } + } + readsCompleteLatch.await(); + + return bks.getBookieRequestProcessor(); + } + + // here we expect that backpressure is disabled and number of writes in progress + // will exceed the limit + private void doWritesNoBackpressure(final int bkId) throws Exception { + BookieServer bks = bs.get(bkId); + bks.shutdown(); + bks = new BookieServer(bsConfs.get(bkId)); + bks.start(); + bs.set(bkId, bks); + + // Create ledgers + LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; + for (int i = 0; i < NUM_OF_LEDGERS; i++) { + lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword); + LOG.info("created ledger ID: {}", lhs[i].getId()); + } + + final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS); + + for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) { + for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { + lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null); + } + } + + boolean exceededLimit = false; + BookieRequestProcessor brp = bks.getBookieRequestProcessor(); + while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) { + int val = brp.maxAddsInProgressCount(); + if (val > MAX_PENDING) { + exceededLimit = true; + break; + } + } + + assertTrue("expected to exceed number of pending writes", exceededLimit); + + for (int i = 0; i < NUM_OF_LEDGERS; i++) { + lhs[i].close(); + } + } + + // here we expect that backpressure is enabled and number of writes in progress + // will never exceed the limit + private void doWritesWithBackpressure(final int bkId) throws Exception { + BookieServer bks = bs.get(bkId); + bks.shutdown(); + bks = new BookieServer(bsConfs.get(bkId)); + bks.start(); + bs.set(bkId, bks); + + LOG.info("test restarted bookie; starting writes"); + + // Create ledgers + LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; + for (int i = 0; i < NUM_OF_LEDGERS; i++) { + lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword); + LOG.info("created ledger ID: {}", lhs[i].getId()); + } + + final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS); + final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + + for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) { + for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { + lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> { + rc.compareAndSet(BKException.Code.OK, rc2); + completeLatch.countDown(); + }, null); + } + } + + LOG.info("test submitted all writes"); + BookieRequestProcessor brp = bks.getBookieRequestProcessor(); + while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) { + int val = brp.maxAddsInProgressCount(); + assertTrue("writes in progress should not exceed limit, got " + val, val <= MAX_PENDING); + } + + if (rc.get() != BKException.Code.OK) { + throw BKException.create(rc.get()); + } + + for (int i = 0; i < NUM_OF_LEDGERS; i++) { + lhs[i].close(); + } + } + + + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + SyncObj sync = (SyncObj) ctx; + sync.setReturnCode(rc); + synchronized (sync) { + sync.counter++; + sync.notify(); + } + } + + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + SyncObj sync = (SyncObj) ctx; + sync.setLedgerEntries(seq); + sync.setReturnCode(rc); + synchronized (sync) { + sync.value = true; + sync.notify(); + } + } + + @Override + public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) { + SyncObj sync = (SyncObj) ctx; + sync.setReturnCode(rc); + synchronized (sync) { + sync.lastConfirmed = lastConfirmed; + sync.notify(); + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java index 354fcaf4b4b..4ebe01cfccb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java @@ -47,6 +47,8 @@ */ public class TestBookieRequestProcessor { + final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); + @Test public void testConstructLongPollThreads() throws Exception { // long poll threads == read threads @@ -134,7 +136,7 @@ public void testToString() { .setBody(ByteString.copyFrom("entrydata".getBytes())).build(); Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build(); - WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null); + WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor); String toString = writeEntryProcessorV3.toString(); assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body")); assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey", @@ -152,7 +154,7 @@ public void testToString() { .setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0) .build(); request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build(); - writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null); + writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor); toString = writeEntryProcessorV3.toString(); assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body")); assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey", @@ -188,7 +190,7 @@ public void testToString() { .setMasterKey(ByteString.copyFrom("masterKey".getBytes())) .setBody(ByteString.copyFrom("entrydata".getBytes())).build(); request = Request.newBuilder().setHeader(header).setWriteLacRequest(writeLacRequest).build(); - WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, null); + WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, requestProcessor); toString = writeLacProcessorV3.toString(); assertFalse("writeLacProcessorV3's toString should have filtered out body", toString.contains("body")); assertFalse("writeLacProcessorV3's toString should have filtered out masterKey", diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml index 7bed0b74335..83a39f46ca9 100644 --- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml +++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml @@ -61,6 +61,12 @@ + + + + + + From fbca2ebf35f5e8b579e153102e26b31a3de0ba45 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 15 May 2018 17:40:20 -0700 Subject: [PATCH 2/7] (@bug W-3651831@) backpressure: server-side backpressure, added handling of non-responsive client --- .../conf/AbstractConfiguration.java | 31 ++++++++ .../bookkeeper/conf/ClientConfiguration.java | 29 -------- .../bookkeeper/conf/ServerConfiguration.java | 71 ++++++++++++++++++- .../bookkeeper/proto/BookieNettyServer.java | 5 ++ .../proto/BookieRequestProcessor.java | 53 ++++++++++++++ .../proto/PacketProcessorBaseV3.java | 36 +++++++++- .../proto/PerChannelBookieClient.java | 18 ++--- .../apache/bookkeeper/util/StringUtils.java | 18 +++++ .../proto/ForceLedgerProcessorV3Test.java | 1 + .../proto/WriteEntryProcessorV3Test.java | 40 +++++++++++ 10 files changed, 257 insertions(+), 45 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 9d6cb50e71d..b44fbc976fe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -132,6 +132,9 @@ public abstract class AbstractConfiguration protected static final String NETTY_MAX_FRAME_SIZE = "nettyMaxFrameSizeBytes"; protected static final int DEFAULT_NETTY_MAX_FRAME_SIZE = 5 * 1024 * 1024; // 5MB + // backpressure configuration + protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs"; + // Zookeeper ACL settings protected static final String ZK_ENABLE_SECURITY = "zkEnableSecurity"; @@ -668,6 +671,34 @@ public T setNettyMaxFrameSizeBytes(int maxSize) { return getThis(); } + /** + * Timeout controlling wait on request send in case of unresponsive bookie(s) + * (i.e. bookie in long GC etc.) + * + * @return timeout value + * negative value disables the feature + * 0 to allow request to fail immediately + * Default is -1 (disabled) + */ + public long getWaitTimeoutOnBackpressureMillis() { + return getLong(WAIT_TIMEOUT_ON_BACKPRESSURE, -1); + } + + /** + * Timeout controlling wait on request send in case of unresponsive bookie(s) + * (i.e. bookie in long GC etc.) + * + * @param value + * negative value disables the feature + * 0 to allow request to fail immediately + * Default is -1 (disabled) + * @return client configuration. + */ + public T setWaitTimeoutOnBackpressureMillis(long value) { + setProperty(WAIT_TIMEOUT_ON_BACKPRESSURE, value); + return getThis(); + } + /** * Get the security provider factory class name. If this returns null, no security will be enforced on the channel. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 7b64d1b3901..d6ece359a6c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -128,7 +128,6 @@ public class ClientConfiguration extends AbstractConfiguration= 0 otherwise ignored. + * + * @return value indicating if channel should be closed. + */ + public boolean getCloseChannelOnResponseTimeout(){ + return this.getBoolean(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, false); + } + + /** + * Configures action in case if server timed out sending response to the client. + * true == close the channel and drop response + * false == drop response + * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored. + * + * @param value + * @return server configuration. + */ + public ServerConfiguration setCloseChannelOnResponseTimeout(boolean value) { + this.setProperty(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, value); + return this; + } /** * Get bookie port that bookie server listen on. * @@ -2799,6 +2827,47 @@ public ServerConfiguration setIgnoreExtraServerComponentsStartupFailures(boolean return this; } + /** + * Get server netty channel write buffer low water mark. + * + * @return netty channel write buffer low water mark. + */ + public int getServerWriteBufferLowWaterMark() { + return getInt(SERVER_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024); + } + + /** + * Set server netty channel write buffer low water mark. + * + * @param waterMark + * netty channel write buffer low water mark. + * @return client configuration. + */ + public ServerConfiguration setServerWriteBufferLowWaterMark(int waterMark) { + setProperty(SERVER_WRITEBUFFER_LOW_WATER_MARK, waterMark); + return this; + } + + /** + * Get server netty channel write buffer high water mark. + * + * @return netty channel write buffer high water mark. + */ + public int getServerWriteBufferHighWaterMark() { + return getInt(SERVER_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024); + } + + /** + * Set server netty channel write buffer high water mark. + * + * @param waterMark + * netty channel write buffer high water mark. + * @return client configuration. + */ + public ServerConfiguration setServerWriteBufferHighWaterMark(int waterMark) { + setProperty(SERVER_WRITEBUFFER_HIGH_WATER_MARK, waterMark); + return this; + } /** * Set registration manager class. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index bc303b92dc9..d687a5c98af 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -35,6 +35,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; @@ -284,6 +285,8 @@ private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddre bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(), conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax())); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( + conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark())); if (eventLoopGroup instanceof EpollEventLoopGroup) { bootstrap.channel(EpollServerSocketChannel.class); @@ -343,6 +346,8 @@ protected void initChannel(SocketChannel ch) throws Exception { jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(), conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax())); + jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( + conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark())); if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) { jvmBootstrap.channel(LocalServerChannel.class); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 078d6560d86..dd762b8224c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -52,6 +52,8 @@ import static org.apache.bookkeeper.proto.RequestUtils.hasFlag; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -61,11 +63,13 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import lombok.AccessLevel; import lombok.Getter; @@ -99,6 +103,7 @@ public class BookieRequestProcessor implements RequestProcessor { * worker threads. */ private final ServerConfiguration serverCfg; + private final long waitTimeoutOnBackpressureMillis; /** * This is the Bookie instance that is used to handle all read and write requests. @@ -174,9 +179,14 @@ public class BookieRequestProcessor implements RequestProcessor { final Semaphore addsSemaphore; final Semaphore readsSemaphore; + // to temporary blacklist channels + final Optional> blacklistedChannels; + final Consumer onResponseTimeout; + public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException { this.serverCfg = serverCfg; + this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnBackpressureMillis(); this.bookie = bookie; this.readThreadPool = createExecutor( this.serverCfg.getNumReadWorkerThreads(), @@ -213,6 +223,25 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, shFactory.init(NodeType.Server, serverCfg); } + if (waitTimeoutOnBackpressureMillis > 0) { + blacklistedChannels = Optional.of(CacheBuilder.newBuilder() + .expireAfterWrite(waitTimeoutOnBackpressureMillis, TimeUnit.MILLISECONDS) + .build()); + } else { + blacklistedChannels = Optional.empty(); + } + + if (serverCfg.getCloseChannelOnResponseTimeout()) { + onResponseTimeout = (ch) -> { + LOG.warn("closing channel {} because it was non-writable for longer than {} ms", + ch, waitTimeoutOnBackpressureMillis); + ch.close(); + }; + } else { + // noop + onResponseTimeout = (ch) -> {}; + } + // Expose Stats this.statsEnabled = serverCfg.isStatisticsEnabled(); this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY); @@ -711,4 +740,28 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe } } } + + public long getWaitTimeoutOnBackpressureMillis() { + return waitTimeoutOnBackpressureMillis; + } + + public void blacklistChannel(Channel channel) { + blacklistedChannels + .ifPresent(x -> x.put(channel, true)); + } + + public void invalidateBlacklist(Channel channel) { + blacklistedChannels + .ifPresent(x -> x.invalidate(channel)); + } + + public boolean isBlacklisted(Channel channel) { + return blacklistedChannels + .map(x -> x.getIfPresent(channel)) + .orElse(false); + } + + public void handleNonWritableChannel(Channel channel) { + onResponseTimeout.accept(channel); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index 9cd9fd5e1b1..7dc29a38bf1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.util.StringUtils; /** * A base class for bookkeeper protocol v3 packet processors. @@ -53,6 +54,40 @@ public PacketProcessorBaseV3(Request request, Channel channel, protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) { final long writeNanos = MathUtils.nowInNano(); + + final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis(); + if (timeOut >= 0 && !channel.isWritable()) { + if (!requestProcessor.isBlacklisted(channel)) { + synchronized (channel) { + if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) { + final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut); + while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + if (!channel.isWritable()) { + requestProcessor.blacklistChannel(channel); + requestProcessor.handleNonWritableChannel(channel); + } + } + } + } + + if (!channel.isWritable()) { + LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, + StringUtils.requestToString(request)); + requestProcessor.getChannelWriteStats() + .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); + statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + return; + } else { + requestProcessor.invalidateBlacklist(channel); + } + } + channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -71,7 +106,6 @@ public void operationComplete(ChannelFuture future) throws Exception { } } }); - } protected boolean isVersionCompatible() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 4d69acf850c..39436d43870 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -126,6 +126,7 @@ import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -939,7 +940,7 @@ private void writeAndFlush(final Channel channel, final Object request, final boolean allowFastFail) { if (channel == null) { - LOG.warn("Operation {} failed: channel == null", requestToString(request)); + LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); return; } @@ -952,7 +953,7 @@ private void writeAndFlush(final Channel channel, if (allowFastFail && !isWritable) { LOG.warn("Operation {} failed: TooManyRequestsException", - requestToString(request)); + StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); return; @@ -975,22 +976,11 @@ private void writeAndFlush(final Channel channel, channel.writeAndFlush(request, promise); } catch (Throwable e) { - LOG.warn("Operation {} failed", requestToString(request), e); + LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); } } - private static String requestToString(Object request) { - if (request instanceof BookkeeperProtocol.Request) { - BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader(); - return String.format("Req(txnId=%d,op=%s,version=%s)", - header.getTxnId(), header.getOperation(), - header.getVersion()); - } else { - return request.toString(); - } - } - void errorOut(final CompletionKey key) { if (LOG.isDebugEnabled()) { LOG.debug("Removing completion key: {}", key); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java index f55edd155cd..73bf0187c30 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java @@ -20,6 +20,8 @@ import java.io.IOException; +import org.apache.bookkeeper.proto.BookkeeperProtocol; + /** * Provided utilites for parsing network addresses, ledger-id from node paths * etc. @@ -161,4 +163,20 @@ public static long stringToHierarchicalLedgerId(String...levelNodes) throws IOEx } } + /** + * Builds string representation of teh request without extra (i.e. binary) data + * + * @param request + * @return string representation of request + */ + public static String requestToString(Object request) { + if (request instanceof BookkeeperProtocol.Request) { + BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader(); + return String.format("Req(txnId=%d,op=%s,version=%s)", + header.getTxnId(), header.getOperation(), + header.getVersion()); + } else { + return request.toString(); + } + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java index 90a51c89e54..37d4647343d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java @@ -74,6 +74,7 @@ public void setup() { bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); + when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L); when(requestProcessor.getForceLedgerStats()) .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger")); when(requestProcessor.getForceLedgerRequestStats()) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java index 8f54ddbc413..df7b1532b63 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java @@ -79,10 +79,13 @@ public void setup() { bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); + when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L); when(requestProcessor.getAddEntryStats()) .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry")); when(requestProcessor.getAddRequestStats()) .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests")); + when(requestProcessor.getChannelWriteStats()) + .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE")); processor = new WriteEntryProcessorV3( request, channel, @@ -242,4 +245,41 @@ public void testNormalWritesOnWritableBookie() throws Exception { assertEquals(StatusCode.EOK, response.getStatus()); } + @Test + public void testWritesWithClientNotAcceptingReponses() throws Exception { + when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L); + + doAnswer(invocationOnMock -> { + Channel ch = invocationOnMock.getArgument(0); + ch.close(); + return null; + }).when(requestProcessor).handleNonWritableChannel(any()); + + when(channel.isWritable()).thenReturn(false); + + when(bookie.isReadOnly()).thenReturn(false); + when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class)); + doAnswer(invocationOnMock -> { + WriteCallback wc = invocationOnMock.getArgument(2); + + wc.writeComplete( + 0, + request.getAddRequest().getLedgerId(), + request.getAddRequest().getEntryId(), + null, + null); + return null; + }).when(bookie).addEntry( + any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0])); + + processor.run(); + + verify(bookie, times(1)) + .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0])); + verify(requestProcessor, times(1)).handleNonWritableChannel(channel); + verify(channel, times(0)).writeAndFlush(any(Response.class)); + verify(channel, times(1)).close(); + } + } From 01d048dafb42265dd38e69f381c9bf607e1cff2b Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 24 May 2018 11:59:20 -0700 Subject: [PATCH 3/7] CR feedback, better mocking in unit tests --- .../org/apache/bookkeeper/bookie/Bookie.java | 4 -- .../org/apache/bookkeeper/bookie/Journal.java | 36 +++++------ .../bookkeeper/bookie/JournalChannel.java | 15 ++--- .../proto/BookieBackpressureTest.java | 63 +++++++++++++++++-- 4 files changed, 81 insertions(+), 37 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index c017a476bb2..2b893876e5e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -106,10 +106,6 @@ public class Bookie extends BookieCriticalThread { private static final Logger LOG = LoggerFactory.getLogger(Bookie.class); - public static final String PROP_SLOW_JOURNAL_FLUSH_DELAY = "test.slowJournal.flushDelay"; - public static final String PROP_SLOW_JOURNAL_ADD_DELAY = "test.slowJournal.addDelay"; - public static final String PROP_SLOW_JOURNAL_GET_DELAY = "test.slowJournal.getDelay"; - final List journalDirectories; final ServerConfiguration conf; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index aba4a6d3a7a..1da0435019e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -77,6 +77,19 @@ private interface JournalIdFilter { boolean accept(long journalId); } + /** + * For testability. + */ + @FunctionalInterface + public interface BufferedChannelBuilder { + BufferedChannelBuilder DEFAULT_BCBUILDER = + (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity); + + BufferedChannel create(FileChannel fc, int capacity) throws IOException; + } + + + /** * List all journal ids by a specified journal id filer. * @@ -908,10 +921,6 @@ public int getJournalQueueLength() { public void run() { LOG.info("Starting journal on {}", journalDirectory); - long getDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_GET_DELAY, 0); - long addDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_ADD_DELAY, 0); - long flushDelay = conf.getLong(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, 0); - RecyclableArrayList toFlush = entryListRecycler.newInstance(); int numEntriesToFlush = 0; ByteBuf lenBuff = Unpooled.buffer(4); @@ -942,24 +951,11 @@ public void run() { if (null == logFile) { logId = logId + 1; - final JournalChannel.BufferedChannelBuilder bcBuilder; - if (getDelay > 0 || addDelay > 0 || flushDelay > 0) { - LOG.warn("delay of journal writes enabled, please make sure it is only used in test env."); - bcBuilder = (FileChannel fc, int capacity) -> { - SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity); - sbc.setAddDelay(addDelay); - sbc.setGetDelay(getDelay); - sbc.setFlushDelay(flushDelay); - return sbc; - }; - } else { - bcBuilder = JournalChannel.DEFAULT_BCBUILDER; - } journalCreationWatcher.reset().start(); logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize, journalAlignmentSize, removePagesFromCache, - journalFormatVersionToWrite, bcBuilder); + journalFormatVersionToWrite, getBufferedChannelBuilder()); journalCreationStats.registerSuccessfulEvent( journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); @@ -1142,6 +1138,10 @@ public void run() { LOG.info("Journal exited loop!"); } + public BufferedChannelBuilder getBufferedChannelBuilder() { + return BufferedChannelBuilder.DEFAULT_BCBUILDER; + } + /** * Shuts down the journal. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index 6246117f3b1..507c933b104 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -42,13 +42,6 @@ class JournalChannel implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(JournalChannel.class); - @FunctionalInterface - public interface BufferedChannelBuilder { - BufferedChannel create(FileChannel fc, int capacity) throws IOException; - } - public static final BufferedChannelBuilder DEFAULT_BCBUILDER = - (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity); - final RandomAccessFile randomAccessFile; final int fd; final FileChannel fc; @@ -101,7 +94,7 @@ public interface BufferedChannelBuilder { JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, long position) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, - position, false, V5, DEFAULT_BCBUILDER); + position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER); } // Open journal to write @@ -109,13 +102,13 @@ public interface BufferedChannelBuilder { long preAllocSize, int writeBufferSize, int journalAlignSize, boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, - fRemoveFromPageCache, formatVersionToWrite, DEFAULT_BCBUILDER); + fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER); } JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, int journalAlignSize, boolean fRemoveFromPageCache, int formatVersionToWrite, - BufferedChannelBuilder bcBuilder) throws IOException { + Journal.BufferedChannelBuilder bcBuilder) throws IOException { this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize, START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder); } @@ -145,7 +138,7 @@ public interface BufferedChannelBuilder { private JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, int journalAlignSize, long position, boolean fRemoveFromPageCache, - int formatVersionToWrite, BufferedChannelBuilder bcBuilder) throws IOException { + int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException { this.journalAlignSize = journalAlignSize; this.zeros = ByteBuffer.allocate(journalAlignSize); this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java index a971ae88032..50ff0aaef7b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java @@ -22,13 +22,20 @@ */ import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import java.lang.reflect.Field; +import java.nio.channels.FileChannel; import java.util.Enumeration; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.Journal; +import org.apache.bookkeeper.bookie.SlowBufferedChannel; import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage; import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; @@ -42,6 +49,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -51,6 +59,7 @@ /** * Tests for backpressure handling on the server side. */ +// PowerMock usage is problematic here due to https://github.com/powermock/powermock/issues/822 public class BookieBackpressureTest extends BookKeeperClusterTestCase implements AddCallback, ReadCallback, ReadLastConfirmedCallback { @@ -67,11 +76,26 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase DigestType digestType; + long getDelay; + long addDelay; + long flushDelay; + public BookieBackpressureTest() { super(1); this.digestType = DigestType.CRC32; baseClientConf.setAddEntryTimeout(100); + baseClientConf.setAddEntryQuorumTimeout(100); + baseClientConf.setReadEntryTimeout(100); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + getDelay = 0; + addDelay = 0; + flushDelay = 0; } class SyncObj { @@ -96,11 +120,39 @@ void setLedgerEntries(Enumeration ls) { } } + private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flushDelay) throws Exception { + if (getDelay <= 0 && addDelay <= 0 && flushDelay <= 0) { + return; + } + + List journals = getJournals(bookie); + for (int i = 0; i < journals.size(); i++) { + Journal mock = spy(journals.get(i)); + when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) -> { + SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity); + sbc.setAddDelay(addDelay); + sbc.setGetDelay(getDelay); + sbc.setFlushDelay(flushDelay); + return sbc; + }); + + journals.set(i, mock); + } + } + + @SuppressWarnings("unchecked") + private List getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException { + Field f = bookie.getClass().getDeclaredField("journals"); + f.setAccessible(true); + + return (List) f.get(bookie); + } + @Test public void testWriteNoBackpressureSlowJournal() throws Exception { //disable backpressure for writes bsConfs.get(0).setMaxAddsInProgress(0); - bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_ADD_DELAY, "1"); + addDelay = 1; doWritesNoBackpressure(0); } @@ -111,7 +163,7 @@ public void testWriteNoBackpressureSlowJournalFlush() throws Exception { bsConfs.get(0).setMaxAddsInProgress(0); // to increase frequency of flushes bsConfs.get(0).setJournalAdaptiveGroupWrites(false); - bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + flushDelay = 1; doWritesNoBackpressure(0); } @@ -120,7 +172,7 @@ public void testWriteNoBackpressureSlowJournalFlush() throws Exception { public void testWriteWithBackpressureSlowJournal() throws Exception { //enable backpressure with MAX_PENDING writes in progress bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); - bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + flushDelay = 1; doWritesWithBackpressure(0); } @@ -132,7 +184,7 @@ public void testWriteWithBackpressureSlowJournalFlush() throws Exception { bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); // to increase frequency of flushes bsConfs.get(0).setJournalAdaptiveGroupWrites(false); - bsConfs.get(0).setProperty(Bookie.PROP_SLOW_JOURNAL_FLUSH_DELAY, "1"); + flushDelay = 1; doWritesWithBackpressure(0); } @@ -255,6 +307,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc BookieServer bks = bs.get(bkId); bks.shutdown(); bks = new BookieServer(bsConfs.get(bkId)); + mockJournal(bks.bookie, getDelay, addDelay, flushDelay); bks.start(); bs.set(bkId, bks); @@ -294,6 +347,7 @@ private void doWritesNoBackpressure(final int bkId) throws Exception { BookieServer bks = bs.get(bkId); bks.shutdown(); bks = new BookieServer(bsConfs.get(bkId)); + mockJournal(bks.bookie, getDelay, addDelay, flushDelay); bks.start(); bs.set(bkId, bks); @@ -335,6 +389,7 @@ private void doWritesWithBackpressure(final int bkId) throws Exception { BookieServer bks = bs.get(bkId); bks.shutdown(); bks = new BookieServer(bsConfs.get(bkId)); + mockJournal(bks.bookie, getDelay, addDelay, flushDelay); bks.start(); bs.set(bkId, bks); From 5499814566ab2177f95bcc6c5d6a399f9f43593e Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 24 May 2018 15:52:40 -0700 Subject: [PATCH 4/7] CR feedback, renamed configuration props --- .../bookkeeper/conf/ServerConfiguration.java | 20 ++++++++-------- .../proto/BookieRequestProcessor.java | 4 ++-- .../proto/BookieBackpressureTest.java | 24 +++++++++---------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index d743bf6dbd9..582fc3088cf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -82,8 +82,8 @@ public class ServerConfiguration extends AbstractConfiguration 0 ? new Semaphore(maxAdds, true) : null; - int maxReads = serverCfg.getMaxReadsInProgress(); + int maxReads = serverCfg.getMaxReadsInProgressLimit(); readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java index 50ff0aaef7b..433384f64fd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java @@ -151,7 +151,7 @@ private List getJournals(Bookie bookie) throws NoSuchFieldException, Il @Test public void testWriteNoBackpressureSlowJournal() throws Exception { //disable backpressure for writes - bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setMaxAddsInProgressLimit(0); addDelay = 1; doWritesNoBackpressure(0); @@ -160,7 +160,7 @@ public void testWriteNoBackpressureSlowJournal() throws Exception { @Test public void testWriteNoBackpressureSlowJournalFlush() throws Exception { //disable backpressure for writes - bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setMaxAddsInProgressLimit(0); // to increase frequency of flushes bsConfs.get(0).setJournalAdaptiveGroupWrites(false); flushDelay = 1; @@ -171,7 +171,7 @@ public void testWriteNoBackpressureSlowJournalFlush() throws Exception { @Test public void testWriteWithBackpressureSlowJournal() throws Exception { //enable backpressure with MAX_PENDING writes in progress - bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING); flushDelay = 1; doWritesWithBackpressure(0); @@ -181,7 +181,7 @@ public void testWriteWithBackpressureSlowJournal() throws Exception { @Test public void testWriteWithBackpressureSlowJournalFlush() throws Exception { //enable backpressure with MAX_PENDING writes in progress - bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING); // to increase frequency of flushes bsConfs.get(0).setJournalAdaptiveGroupWrites(false); flushDelay = 1; @@ -192,7 +192,7 @@ public void testWriteWithBackpressureSlowJournalFlush() throws Exception { @Test public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception { //disable backpressure for writes - bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setMaxAddsInProgressLimit(0); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -204,7 +204,7 @@ public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception { @Test public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception { //enable backpressure with MAX_PENDING writes in progress - bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -216,7 +216,7 @@ public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception { @Test public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception { //disable backpressure for writes - bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setMaxAddsInProgressLimit(0); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -228,7 +228,7 @@ public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exceptio @Test public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception { //enable backpressure with MAX_PENDING writes in progress - bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -240,7 +240,7 @@ public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Except @Test public void testWriteNoBackpressureSortedStorage() throws Exception { //disable backpressure for writes - bsConfs.get(0).setMaxAddsInProgress(0); + bsConfs.get(0).setMaxAddsInProgressLimit(0); bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -258,7 +258,7 @@ public void testWriteNoBackpressureSortedStorage() throws Exception { @Test public void testWriteWithBackpressureSortedStorage() throws Exception { //enable backpressure with MAX_PENDING writes in progress - bsConfs.get(0).setMaxAddsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING); bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -276,7 +276,7 @@ public void testWriteWithBackpressureSortedStorage() throws Exception { @Test public void testReadsNoBackpressure() throws Exception { //disable backpressure for reads - bsConfs.get(0).setMaxReadsInProgress(0); + bsConfs.get(0).setMaxReadsInProgressLimit(0); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); @@ -291,7 +291,7 @@ public void testReadsNoBackpressure() throws Exception { @Test public void testReadsWithBackpressure() throws Exception { //enable backpressure for reads - bsConfs.get(0).setMaxReadsInProgress(MAX_PENDING); + bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING); bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); From 1b92d2bb4d51e0b1a9ca3bb1fefe8b5086b1379f Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 24 May 2018 16:04:46 -0700 Subject: [PATCH 5/7] CR feedback, extra logging --- .../bookkeeper/proto/BookieRequestProcessor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index a01dec1656e..895e31d3874 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -331,11 +331,13 @@ protected void onAddRequestStart(Channel channel) { if (!addsSemaphore.tryAcquire()) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); + LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel); addsBlocked.incrementAndGet(); addsSemaphore.acquireUninterruptibly(); channel.config().setAutoRead(true); - addEntryBlockedStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), - TimeUnit.NANOSECONDS); + final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); + LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos); + addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); addsBlocked.decrementAndGet(); } } @@ -355,11 +357,13 @@ protected void onReadRequestStart(Channel channel) { if (!readsSemaphore.tryAcquire()) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); + LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel); readsBlocked.incrementAndGet(); readsSemaphore.acquireUninterruptibly(); channel.config().setAutoRead(true); - readEntryBlockedStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), - TimeUnit.NANOSECONDS); + final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); + LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos); + readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS); readsBlocked.decrementAndGet(); } } From 2a7a9c489a47d75c0fdac4381034ce0e084ee9dc Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 24 May 2018 16:32:38 -0700 Subject: [PATCH 6/7] CR feedback, split client and server props to have different names --- .../conf/AbstractConfiguration.java | 31 ------------------- .../bookkeeper/conf/ClientConfiguration.java | 30 ++++++++++++++++++ .../bookkeeper/conf/ServerConfiguration.java | 31 +++++++++++++++++++ .../proto/BookieRequestProcessor.java | 2 +- 4 files changed, 62 insertions(+), 32 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index b44fbc976fe..9d6cb50e71d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -132,9 +132,6 @@ public abstract class AbstractConfiguration protected static final String NETTY_MAX_FRAME_SIZE = "nettyMaxFrameSizeBytes"; protected static final int DEFAULT_NETTY_MAX_FRAME_SIZE = 5 * 1024 * 1024; // 5MB - // backpressure configuration - protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs"; - // Zookeeper ACL settings protected static final String ZK_ENABLE_SECURITY = "zkEnableSecurity"; @@ -671,34 +668,6 @@ public T setNettyMaxFrameSizeBytes(int maxSize) { return getThis(); } - /** - * Timeout controlling wait on request send in case of unresponsive bookie(s) - * (i.e. bookie in long GC etc.) - * - * @return timeout value - * negative value disables the feature - * 0 to allow request to fail immediately - * Default is -1 (disabled) - */ - public long getWaitTimeoutOnBackpressureMillis() { - return getLong(WAIT_TIMEOUT_ON_BACKPRESSURE, -1); - } - - /** - * Timeout controlling wait on request send in case of unresponsive bookie(s) - * (i.e. bookie in long GC etc.) - * - * @param value - * negative value disables the feature - * 0 to allow request to fail immediately - * Default is -1 (disabled) - * @return client configuration. - */ - public T setWaitTimeoutOnBackpressureMillis(long value) { - setProperty(WAIT_TIMEOUT_ON_BACKPRESSURE, value); - return getThis(); - } - /** * Get the security provider factory class name. If this returns null, no security will be enforced on the channel. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index d6ece359a6c..0dadc6274e1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -128,6 +128,8 @@ public class ClientConfiguration extends AbstractConfiguration Date: Thu, 24 May 2018 21:21:42 -0700 Subject: [PATCH 7/7] Java9 jenkins run failed one test with bookie being readonly at the time when test started. Trying to fix + add more logging --- .../apache/bookkeeper/proto/BookieServer.java | 2 +- .../proto/BookieBackpressureTest.java | 38 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index a4685bde913..d386b915c3f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -132,7 +132,7 @@ public void start() throws IOException, UnavailableException, InterruptedExcepti // fixes test flappers at random places until ISSUE#1400 is resolved // https://github.com/apache/bookkeeper/issues/1400 - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(250); } @VisibleForTesting diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java index 433384f64fd..5454a702dfe 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java @@ -69,10 +69,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase final byte[] data = new byte[8 * 1024]; - // test related variables - static final int NUM_ENTRIES_TO_WRITE = 500; - static final int MAX_PENDING = 10; - static final int NUM_OF_LEDGERS = 25; + // test related constants + static final int NUM_ENTRIES_TO_WRITE = 200; + static final int ENTRIES_IN_MEMTABLE = 2; + static final int MAX_PENDING = 2 * ENTRIES_IN_MEMTABLE + 1; + static final int NUM_OF_LEDGERS = 2 * MAX_PENDING; DigestType digestType; @@ -244,11 +245,10 @@ public void testWriteNoBackpressureSortedStorage() throws Exception { bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); - final int entriesInMemtable = 3; // one for memtable being flushed, one for the part accepting the data assertTrue("for the test, memtable should not keep more entries than allowed", - entriesInMemtable * 2 <= MAX_PENDING); - bsConfs.get(0).setSkipListSizeLimit(data.length * entriesInMemtable); + ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING); + bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1); bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); @@ -262,11 +262,10 @@ public void testWriteWithBackpressureSortedStorage() throws Exception { bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName()); bsConfs.get(0).setWriteBufferBytes(data.length); - final int entriesInMemtable = 3; // one for memtable being flushed, one for the part accepting the data assertTrue("for the test, memtable should not keep more entries than allowed", - entriesInMemtable * 2 <= MAX_PENDING); - bsConfs.get(0).setSkipListSizeLimit(data.length * entriesInMemtable); + ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING); + bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1); bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1"); bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10"); @@ -311,6 +310,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc bks.start(); bs.set(bkId, bks); + LOG.info("creating ledgers"); // Create ledgers final int numEntriesForReads = 10; LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; @@ -319,7 +319,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc LOG.info("created ledger ID: {}", lhs[i].getId()); } - // generate data for reads + LOG.info("generating data for reads"); final CountDownLatch writesCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS); for (int i = 0; i < numEntriesForReads; i++) { for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { @@ -328,8 +328,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc } writesCompleteLatch.await(); - // issue bunch of async reads - // generate data for reads + LOG.info("issue bunch of async reads"); final CountDownLatch readsCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS); for (int i = 0; i < numEntriesForReads; i++) { for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { @@ -337,6 +336,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc } } readsCompleteLatch.await(); + LOG.info("reads finished"); return bks.getBookieRequestProcessor(); } @@ -351,7 +351,7 @@ private void doWritesNoBackpressure(final int bkId) throws Exception { bks.start(); bs.set(bkId, bks); - // Create ledgers + LOG.info("Creating ledgers"); LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; for (int i = 0; i < NUM_OF_LEDGERS; i++) { lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword); @@ -360,6 +360,7 @@ private void doWritesNoBackpressure(final int bkId) throws Exception { final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS); + LOG.info("submitting writes"); for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) { for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null); @@ -374,6 +375,8 @@ private void doWritesNoBackpressure(final int bkId) throws Exception { exceededLimit = true; break; } + LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}", + val, MAX_PENDING); } assertTrue("expected to exceed number of pending writes", exceededLimit); @@ -393,9 +396,7 @@ private void doWritesWithBackpressure(final int bkId) throws Exception { bks.start(); bs.set(bkId, bks); - LOG.info("test restarted bookie; starting writes"); - - // Create ledgers + LOG.info("Creating ledgers"); LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS]; for (int i = 0; i < NUM_OF_LEDGERS; i++) { lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword); @@ -405,6 +406,7 @@ private void doWritesWithBackpressure(final int bkId) throws Exception { final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS); final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + LOG.info("submitting writes"); for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) { for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) { lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> { @@ -419,6 +421,8 @@ private void doWritesWithBackpressure(final int bkId) throws Exception { while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) { int val = brp.maxAddsInProgressCount(); assertTrue("writes in progress should not exceed limit, got " + val, val <= MAX_PENDING); + LOG.info("Waiting for all writes to succeed, left {} of {}", + completeLatch.getCount(), NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS); } if (rc.get() != BKException.Code.OK) {