From 39f2ff43c76f097b477aeab8c1d4ec4a8c9ea696 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Mon, 12 Nov 2018 11:34:25 -0800 Subject: [PATCH 1/3] [bookie] Fix sorted ledger storage rotating entry log files too frequent *Motivation* A strong behavior was observed when using sorted ledger storage with single entry log manager on production: "the entry log files are rotated very frequently and small entry log files are produced". The problem was introduced due to #1410. At current entry logger, when a new entry log file is created, EntryLogger will notify its listeners that a new entry log file is rotated via [`onRotateEntryLog`](https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java#L152). Before the change in #1410, `SortedLedgerStorage` inherits from `InterleavedLedgerStorage`. So when a new entry log file is rotated, `SortedLedgerStorage` is notified. However after the change in #1410, `SortedLedgerStorage` doesn't inherits `InterleavedLedgerStorage` anymore. Instead, the relationship is changed to composition. `SortedLedgerStorage` is composed using an interleaved ledger storage. So the entrylog listener contract was broken. `SortedLedgerStorage` will not receive any `onRotateEntryLog` notification any more. *Changes* When `SortedLedgerStorage` initializes, it passes its own entry log listener down to the interleaved ledger storage. So entry logger can notify the right person for entry log rotations. *Tests* Existing tests should cover most of the case. Looking for how to add new test cases. --- .../bookie/InterleavedLedgerStorage.java | 23 ++++++++++++++++++- .../bookie/SortedLedgerStorage.java | 5 +++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 08e7f4ef914..8ab6517b489 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -98,11 +98,32 @@ public void initialize(ServerConfiguration conf, Checkpointer checkpointer, StatsLogger statsLogger) throws IOException { + initializeWithEntryLogListener( + conf, + ledgerManager, + ledgerDirsManager, + indexDirsManager, + stateManager, + checkpointSource, + checkpointer, + this, + statsLogger); + } + + void initializeWithEntryLogListener(ServerConfiguration conf, + LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, + LedgerDirsManager indexDirsManager, + StateManager stateManager, + CheckpointSource checkpointSource, + Checkpointer checkpointer, + EntryLogListener entryLogListener, + StatsLogger statsLogger) throws IOException { checkNotNull(checkpointSource, "invalid null checkpoint source"); checkNotNull(checkpointer, "invalid null checkpointer"); this.checkpointSource = checkpointSource; this.checkpointer = checkpointer; - entryLogger = new EntryLogger(conf, ledgerDirsManager, this, statsLogger.scope(ENTRYLOGGER_SCOPE)); + entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)); ledgerCache = new LedgerCacheImpl(conf, activeLedgers, null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger); gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc")); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 5c4f75a22e2..e4b137ffbb5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -72,7 +72,7 @@ public void initialize(ServerConfiguration conf, StatsLogger statsLogger) throws IOException { - interleavedLedgerStorage.initialize( + interleavedLedgerStorage.initializeWithEntryLogListener( conf, ledgerManager, ledgerDirsManager, @@ -80,6 +80,9 @@ public void initialize(ServerConfiguration conf, stateManager, checkpointSource, checkpointer, + // uses sorted ledger storage's own entry log listener + // since it manages entry log rotations and checkpoints. + this, statsLogger); if (conf.isEntryLogPerLedgerEnabled()) { From 5ecf449905cea727c5ea3ed7d86c0c99f7168e5d Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 13 Nov 2018 00:48:27 -0800 Subject: [PATCH 2/3] Add "reason" on creating new log --- .../bookkeeper/bookie/EntryLogManagerBase.java | 14 ++++++++++++++ .../EntryLogManagerForEntryLogPerLedger.java | 6 ++++-- .../bookie/EntryLogManagerForSingleEntryLog.java | 12 ++++++++---- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java index 701fb7b2e8e..f9c6d97cfd8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java @@ -21,6 +21,9 @@ package org.apache.bookkeeper.bookie; +import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID; + +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.concurrent.FastThreadLocal; @@ -131,7 +134,18 @@ void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throw * Creates a new log file. This method should be guarded by a lock, * so callers of this method should be in right scope of the lock. */ + @VisibleForTesting void createNewLog(long ledgerId) throws IOException { + createNewLog(ledgerId, ""); + } + + void createNewLog(long ledgerId, String reason) throws IOException { + if (ledgerId != UNASSIGNED_LEDGERID) { + log.info("Creating a new entry log file for ledger '{}' {}", ledgerId, reason); + } else { + log.info("Creating a new entry log file {}", reason); + } + BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId); // first tried to create a new log channel. add current log channel to ToFlush list only when // there is a new log channel. it would prevent that a log channel is referenced by both diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java index 39ed60cea57..9b7b7b7e99d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java @@ -484,7 +484,7 @@ public boolean commitEntryMemTableFlush() throws IOException { try { if (reachEntryLogLimit(currentLog, 0L)) { log.info("Rolling entry logger since it reached size limitation for ledger: {}", ledgerId); - createNewLog(ledgerId); + createNewLog(ledgerId, "after entry log file is rotated"); } } finally { lock.unlock(); @@ -640,7 +640,9 @@ BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySiz if (logChannel != null) { logChannel.flushAndForceWriteIfRegularFlush(false); } - createNewLog(ledgerId); + createNewLog(ledgerId, + ": diskFull = " + diskFull + ", allDisksFull = " + allDisksFull + + ", reachEntryLogLimit = " + reachEntryLogLimit); } return getCurrentLogForLedger(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java index 3e552d0fca9..72c818a30c7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java @@ -92,7 +92,7 @@ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, boolean rollLog) throws IOException { if (null == activeLogChannel) { // log channel can be null because the file is deferred to be created - createNewLog(UNASSIGNED_LEDGERID); + createNewLog(UNASSIGNED_LEDGERID, "because current active log channel has not initialized yet"); } boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize) @@ -103,7 +103,8 @@ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, if (activeLogChannel != null) { activeLogChannel.flushAndForceWriteIfRegularFlush(false); } - createNewLog(UNASSIGNED_LEDGERID); + createNewLog(UNASSIGNED_LEDGERID, + ": createNewLog = " + createNewLog + ", reachEntryLogLimit = " + reachEntryLogLimit); // Reset the flag if (createNewLog) { shouldCreateNewEntryLog.set(false); @@ -238,7 +239,9 @@ public boolean commitEntryMemTableFlush() throws IOException { */ if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) { log.info("Rolling entry logger since it reached size limitation"); - createNewLog(UNASSIGNED_LEDGERID); + createNewLog(UNASSIGNED_LEDGERID, + "due to reaching log limit after flushing memtable : logIdBeforeFlush = " + + logIdBeforeFlush + ", logIdAfterFlush = " + logIdAfterFlush); return true; } return false; @@ -251,7 +254,8 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO // it means bytes might live at current active entry log, we need // roll current entry log and then issue checkpoint to underlying // interleaved ledger storage. - createNewLog(UNASSIGNED_LEDGERID); + createNewLog(UNASSIGNED_LEDGERID, + "due to preparing checkpoint : numBytesFlushed = " + numBytesFlushed); } } From a64ff8bc4d699d615f9ddb688d80149f1b9efb5f Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 14 Nov 2018 11:03:11 -0800 Subject: [PATCH 3/3] Addressed review comments --- .../bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java index 9b7b7b7e99d..8093b53b157 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java @@ -642,7 +642,7 @@ BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySiz } createNewLog(ledgerId, ": diskFull = " + diskFull + ", allDisksFull = " + allDisksFull - + ", reachEntryLogLimit = " + reachEntryLogLimit); + + ", reachEntryLogLimit = " + reachEntryLogLimit + ", logChannel = " + logChannel); } return getCurrentLogForLedger(ledgerId);