Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

package org.apache.bookkeeper.bookie;

import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
Expand Down Expand Up @@ -131,7 +134,18 @@ void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throw
* Creates a new log file. This method should be guarded by a lock,
* so callers of this method should be in right scope of the lock.
*/
@VisibleForTesting
void createNewLog(long ledgerId) throws IOException {
createNewLog(ledgerId, "");
}

void createNewLog(long ledgerId, String reason) throws IOException {
if (ledgerId != UNASSIGNED_LEDGERID) {
log.info("Creating a new entry log file for ledger '{}' {}", ledgerId, reason);
} else {
log.info("Creating a new entry log file {}", reason);
}

BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
// first tried to create a new log channel. add current log channel to ToFlush list only when
// there is a new log channel. it would prevent that a log channel is referenced by both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public boolean commitEntryMemTableFlush() throws IOException {
try {
if (reachEntryLogLimit(currentLog, 0L)) {
log.info("Rolling entry logger since it reached size limitation for ledger: {}", ledgerId);
createNewLog(ledgerId);
createNewLog(ledgerId, "after entry log file is rotated");
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -640,7 +640,9 @@ BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySiz
if (logChannel != null) {
logChannel.flushAndForceWriteIfRegularFlush(false);
}
createNewLog(ledgerId);
createNewLog(ledgerId,
": diskFull = " + diskFull + ", allDisksFull = " + allDisksFull
+ ", reachEntryLogLimit = " + reachEntryLogLimit + ", logChannel = " + logChannel);
}

return getCurrentLogForLedger(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId,
boolean rollLog) throws IOException {
if (null == activeLogChannel) {
// log channel can be null because the file is deferred to be created
createNewLog(UNASSIGNED_LEDGERID);
createNewLog(UNASSIGNED_LEDGERID, "because current active log channel has not initialized yet");
}

boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
Expand All @@ -103,7 +103,8 @@ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId,
if (activeLogChannel != null) {
activeLogChannel.flushAndForceWriteIfRegularFlush(false);
}
createNewLog(UNASSIGNED_LEDGERID);
createNewLog(UNASSIGNED_LEDGERID,
": createNewLog = " + createNewLog + ", reachEntryLogLimit = " + reachEntryLogLimit);
// Reset the flag
if (createNewLog) {
shouldCreateNewEntryLog.set(false);
Expand Down Expand Up @@ -238,7 +239,9 @@ public boolean commitEntryMemTableFlush() throws IOException {
*/
if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
log.info("Rolling entry logger since it reached size limitation");
createNewLog(UNASSIGNED_LEDGERID);
createNewLog(UNASSIGNED_LEDGERID,
"due to reaching log limit after flushing memtable : logIdBeforeFlush = "
+ logIdBeforeFlush + ", logIdAfterFlush = " + logIdAfterFlush);
return true;
}
return false;
Expand All @@ -251,7 +254,8 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO
// it means bytes might live at current active entry log, we need
// roll current entry log and then issue checkpoint to underlying
// interleaved ledger storage.
createNewLog(UNASSIGNED_LEDGERID);
createNewLog(UNASSIGNED_LEDGERID,
"due to preparing checkpoint : numBytesFlushed = " + numBytesFlushed);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,32 @@ public void initialize(ServerConfiguration conf,
Checkpointer checkpointer,
StatsLogger statsLogger)
throws IOException {
initializeWithEntryLogListener(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
stateManager,
checkpointSource,
checkpointer,
this,
statsLogger);
}

void initializeWithEntryLogListener(ServerConfiguration conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding a new initialize method, you should consider changing the signature of the existing initialize method, which takes EntryLogListener argument. In ILS initialize method it will check if it receives the listener, if it is not null use it otherwise use ‘this’.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think currently entry log listener is only visible to interleaved and sorte ledger storages. but it is not visible to db ledger storage. so I think it is probably better to use my approach here, since the behavior is only applied to interleaved and sorted ledger storage.

we can improve it later if this listener is going to be applied to all ledger storage implementation.

LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
EntryLogListener entryLogListener,
StatsLogger statsLogger) throws IOException {
checkNotNull(checkpointSource, "invalid null checkpoint source");
checkNotNull(checkpointer, "invalid null checkpointer");
this.checkpointSource = checkpointSource;
this.checkpointer = checkpointer;
entryLogger = new EntryLogger(conf, ledgerDirsManager, this, statsLogger.scope(ENTRYLOGGER_SCOPE));
entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE));
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,17 @@ public void initialize(ServerConfiguration conf,
StatsLogger statsLogger)
throws IOException {

interleavedLedgerStorage.initialize(
interleavedLedgerStorage.initializeWithEntryLogListener(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
stateManager,
checkpointSource,
checkpointer,
// uses sorted ledger storage's own entry log listener
// since it manages entry log rotations and checkpoints.
this,
statsLogger);

if (conf.isEntryLogPerLedgerEnabled()) {
Expand Down