diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java index 57ec8978cc0..f5ae90b3362 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java @@ -57,7 +57,10 @@ public AbstractLogCompactor(ServerConfiguration conf, LogRemovalListener logRemo */ public void cleanUpAndRecover() {} - static class Throttler { + /** + * class Throttler. + */ + public static class Throttler { private final RateLimiter rateLimiter; private final boolean isThrottleByBytes; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index edf9194f163..a7e15f67b63 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.annotation.InterfaceAudience.Private; @@ -2441,7 +2442,7 @@ public static void main(String[] argv) throws Exception { private synchronized void initEntryLogger() throws IOException { if (null == entryLogger) { // provide read only entry logger - entryLogger = new ReadOnlyEntryLogger(bkConf); + entryLogger = new ReadOnlyDefaultEntryLogger(bkConf); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java index 9a5a0abbcba..832cb57d1a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java @@ -22,7 +22,7 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; - +import org.apache.bookkeeper.bookie.storage.EntryLogger; /** * Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java similarity index 82% rename from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java rename to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 859f3e588b0..dc90811e839 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -45,6 +45,7 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -54,11 +55,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.regex.Pattern; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.DiskChecker; +import org.apache.bookkeeper.util.HardLink; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong; @@ -72,11 +78,8 @@ * the actual ledger entry. The entry log files created by this class are * identified by a long. */ -public class EntryLogger { - private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class); - static final long UNASSIGNED_LEDGERID = -1L; - // log file suffix - static final String LOG_FILE_SUFFIX = ".log"; +public class DefaultEntryLogger implements EntryLogger { + private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogger.class); @VisibleForTesting static final int UNINITIALIZED_LOG_ID = -0xDEAD; @@ -285,33 +288,6 @@ private static class Header { private final ByteBufAllocator allocator; final ServerConfiguration conf; - /** - * Scan entries in a entry log file. - */ - public interface EntryLogScanner { - /** - * Tests whether or not the entries belongs to the specified ledger - * should be processed. - * - * @param ledgerId - * Ledger ID. - * @return true if and only the entries of the ledger should be scanned. - */ - boolean accept(long ledgerId); - - /** - * Process an entry. - * - * @param ledgerId - * Ledger ID. - * @param offset - * File offset of this entry. - * @param entry - * Entry ByteBuf - * @throws IOException - */ - void process(long ledgerId, long offset, ByteBuf entry) throws IOException; - } /** * Entry Log Listener. @@ -323,7 +299,7 @@ interface EntryLogListener { void onRotateEntryLog(); } - public EntryLogger(ServerConfiguration conf) throws IOException { + public DefaultEntryLogger(ServerConfiguration conf) throws IOException { this(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); } @@ -331,14 +307,14 @@ public EntryLogger(ServerConfiguration conf) throws IOException { /** * Create an EntryLogger that stores it's log files in the given directories. */ - public EntryLogger(ServerConfiguration conf, - LedgerDirsManager ledgerDirsManager) throws IOException { + public DefaultEntryLogger(ServerConfiguration conf, + LedgerDirsManager ledgerDirsManager) throws IOException { this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT); } - public EntryLogger(ServerConfiguration conf, - LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger, - ByteBufAllocator allocator) throws IOException { + public DefaultEntryLogger(ServerConfiguration conf, + LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger, + ByteBufAllocator allocator) throws IOException { //We reserve 500 bytes as overhead for the protocol. This is not 100% accurate // but the protocol varies so an exact value is difficult to determine this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500; @@ -478,7 +454,8 @@ public BufferedReadChannel getFromChannels(long logId) { * * @return least unflushed log id. */ - long getLeastUnflushedLogId() { + @Override + public long getLeastUnflushedLogId() { return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); } @@ -489,7 +466,8 @@ long getLeastUnflushedLogId() { * * @return last entry log id created. */ - long getLastLogId() { + @Override + public long getLastLogId() { return recentlyCreatedEntryLogsStatus.getLastLogId(); } @@ -499,7 +477,8 @@ long getLastLogId() { * @param entryLogId EntryLog id to check. * @return Whether the given entryLogId exists and has been rotated. */ - boolean isFlushedEntryLog(Long entryLogId) { + @Override + public boolean isFlushedEntryLog(Long entryLogId) { return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId); } @@ -510,7 +489,7 @@ long getPreviousAllocatedEntryLogId() { /** * Get the current log file for compaction. */ - File getCurCompactionLogFile() { + private File getCurCompactionLogFile() { synchronized (compactionLogLock) { if (compactionLogChannel == null) { return null; @@ -544,7 +523,8 @@ EntryLoggerAllocator getEntryLoggerAllocator() { * @param entryLogId * Entry Log File Id */ - protected boolean removeEntryLog(long entryLogId) { + @Override + public boolean removeEntryLog(long entryLogId) { removeFromChannelsAndClose(entryLogId); File entryLogFile; try { @@ -610,6 +590,7 @@ void checkpoint() throws IOException { entryLogManager.checkpoint(); } + @Override public void flush() throws IOException { entryLogManager.flush(); } @@ -618,12 +599,13 @@ long addEntry(long ledger, ByteBuffer entry) throws IOException { return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true); } - long addEntry(long ledger, ByteBuf entry) throws IOException { - return entryLogManager.addEntry(ledger, entry, true); + long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { + return entryLogManager.addEntry(ledger, entry, rollLog); } - public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { - return entryLogManager.addEntry(ledger, entry, rollLog); + @Override + public long addEntry(long ledger, ByteBuf entry) throws IOException { + return entryLogManager.addEntry(ledger, entry, true); } private final FastThreadLocal sizeBuffer = new FastThreadLocal() { @@ -634,7 +616,7 @@ protected ByteBuf initialValue() throws Exception { } }; - long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException { + private long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException { synchronized (compactionLogLock) { int entrySize = entry.readableBytes() + 4; if (compactionLogChannel == null) { @@ -653,7 +635,7 @@ long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException { } } - void flushCompactionLog() throws IOException { + private void flushCompactionLog() throws IOException { synchronized (compactionLogLock) { if (compactionLogChannel != null) { compactionLogChannel.appendLedgersMap(); @@ -671,7 +653,7 @@ void flushCompactionLog() throws IOException { } } - void createNewCompactionLog() throws IOException { + private void createNewCompactionLog() throws IOException { synchronized (compactionLogLock) { if (compactionLogChannel == null) { compactionLogChannel = entryLogManager.createNewLogForCompaction(); @@ -683,7 +665,7 @@ void createNewCompactionLog() throws IOException { * Remove the current compaction log, usually invoked when compaction failed and * we need to do some clean up to remove the compaction log file. */ - void removeCurCompactionLog() { + private void removeCurCompactionLog() { synchronized (compactionLogLock) { if (compactionLogChannel != null) { if (!compactionLogChannel.getLogFile().delete()) { @@ -834,8 +816,20 @@ private void validateEntry(long ledgerId, long entryId, long entryLogId, long po } } - public ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) - throws IOException { + @Override + public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) + throws IOException, Bookie.NoEntryException { + return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */); + } + + @Override + public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException { + return internalReadEntry(location, -1L, -1L, false /* validateEntry */); + } + + + private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) + throws IOException, Bookie.NoEntryException { long entryLogId = logIdForOffset(location); long pos = posForOffset(location); @@ -877,10 +871,6 @@ public ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boo return data; } - public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException { - return internalReadEntry(ledgerId, entryId, location, true /* validateEntry */); - } - /** * Read the header of an entry log. */ @@ -932,7 +922,8 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti /** * Whether the log file exists or not. */ - boolean logExists(long logId) { + @Override + public boolean logExists(long logId) { for (File d : ledgerDirsManager.getAllLedgerDirs()) { File f = new File(d, Long.toHexString(logId) + ".log"); if (f.exists()) { @@ -988,6 +979,7 @@ private File findFile(long logId) throws FileNotFoundException { * @param scanner Entry Log Scanner * @throws IOException */ + @Override public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException { // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes) ByteBuf headerBuffer = Unpooled.buffer(4 + 8); @@ -1054,19 +1046,6 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce } } - public EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException { - // First try to extract the EntryLogMetadata from the index, if there's no index then fallback to scanning the - // entry log - try { - return extractEntryLogMetadataFromIndex(entryLogId); - } catch (Exception e) { - LOG.info("Failed to get ledgers map index from: {}.log : {}", entryLogId, e.getMessage()); - - // Fall-back to scanning - return extractEntryLogMetadataByScanning(entryLogId); - } - } - public EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler) throws IOException { // First try to extract the EntryLogMetadata from the index, if there's no index then fallback to scanning the @@ -1164,10 +1143,6 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce return meta; } - private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId) throws IOException { - return extractEntryLogMetadataByScanning(entryLogId, null); - } - private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId, AbstractLogCompactor.Throttler throttler) throws IOException { @@ -1199,7 +1174,8 @@ public boolean accept(long ledgerId) { /** * Shutdown method to gracefully stop entry logger. */ - public void shutdown() { + @Override + public void close() { // since logChannel is buffered channel, do flush when shutting down LOG.info("Stopping EntryLogger"); try { @@ -1303,4 +1279,177 @@ synchronized boolean isFlushedEntryLog(Long entryLogId) { || entryLogId < leastUnflushedLogId; } } + + @Override + public CompactionEntryLog newCompactionLog(long logToCompact) throws IOException { + createNewCompactionLog(); + + File compactingLogFile = getCurCompactionLogFile(); + long compactionLogId = fileName2LogId(compactingLogFile.getName()); + File compactedLogFile = compactedLogFileFromCompacting(compactingLogFile, logToCompact); + File finalLogFile = new File(compactingLogFile.getParentFile(), + compactingLogFile.getName().substring(0, + compactingLogFile.getName().indexOf(".log") + 4)); + return new EntryLoggerCompactionEntryLog( + compactionLogId, logToCompact, compactingLogFile, compactedLogFile, finalLogFile); + + } + + private class EntryLoggerCompactionEntryLog implements CompactionEntryLog { + private final long compactionLogId; + private final long logIdToCompact; + private final File compactingLogFile; + private final File compactedLogFile; + private final File finalLogFile; + + EntryLoggerCompactionEntryLog(long compactionLogId, long logIdToCompact, + File compactingLogFile, + File compactedLogFile, + File finalLogFile) { + this.compactionLogId = compactionLogId; + this.logIdToCompact = logIdToCompact; + this.compactingLogFile = compactingLogFile; + this.compactedLogFile = compactedLogFile; + this.finalLogFile = finalLogFile; + } + + @Override + public long addEntry(long ledgerId, ByteBuf entry) throws IOException { + return addEntryForCompaction(ledgerId, entry); + } + @Override + public void scan(EntryLogScanner scanner) throws IOException { + scanEntryLog(compactionLogId, scanner); + } + @Override + public void flush() throws IOException { + flushCompactionLog(); + } + @Override + public void abort() { + removeCurCompactionLog(); + if (compactedLogFile.exists()) { + if (!compactedLogFile.delete()) { + LOG.warn("Could not delete file: {}", compactedLogFile); + } + } + } + + @Override + public void markCompacted() throws IOException { + if (compactingLogFile.exists()) { + if (!compactedLogFile.exists()) { + HardLink.createHardLink(compactingLogFile, compactedLogFile); + } + } else { + throw new IOException("Compaction log doesn't exist any more after flush: " + compactingLogFile); + } + removeCurCompactionLog(); + } + + @Override + public void makeAvailable() throws IOException { + if (!finalLogFile.exists()) { + HardLink.createHardLink(compactedLogFile, finalLogFile); + } + } + @Override + public void cleanup() { + if (compactedLogFile.exists()) { + if (!compactedLogFile.delete()) { + LOG.warn("Could not delete file: {}", compactedLogFile); + } + } + if (compactingLogFile.exists()) { + if (!compactingLogFile.delete()) { + LOG.warn("Could not delete file: {}", compactingLogFile); + } + } + } + + @Override + public long getLogId() { + return compactionLogId; + } + @Override + public long getCompactedLogId() { + return logIdToCompact; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("logId", compactionLogId) + .add("compactedLogId", logIdToCompact) + .add("compactingLogFile", compactingLogFile) + .add("compactedLogFile", compactedLogFile) + .add("finalLogFile", finalLogFile) + .toString(); + } + } + + @Override + public Collection incompleteCompactionLogs() { + List ledgerDirs = ledgerDirsManager.getAllLedgerDirs(); + List compactionLogs = new ArrayList<>(); + + for (File dir : ledgerDirs) { + File[] compactingPhaseFiles = dir.listFiles( + file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTING_SUFFIX)); + if (compactingPhaseFiles != null) { + for (File file : compactingPhaseFiles) { + if (file.delete()) { + LOG.info("Deleted failed compaction file {}", file); + } + } + } + File[] compactedPhaseFiles = dir.listFiles( + file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTED_SUFFIX)); + if (compactedPhaseFiles != null) { + for (File compactedFile : compactedPhaseFiles) { + LOG.info("Found compacted log file {} has partially flushed index, recovering index.", + compactedFile); + + File compactingLogFile = new File(compactedFile.getParentFile(), "doesntexist"); + long compactionLogId = -1L; + long compactedLogId = -1L; + String[] parts = compactedFile.getName().split(Pattern.quote(".")); + boolean valid = true; + if (parts.length != 4) { + valid = false; + } else { + try { + compactionLogId = Long.parseLong(parts[0], 16); + compactedLogId = Long.parseLong(parts[2], 16); + } catch (NumberFormatException nfe) { + valid = false; + } + } + + if (!valid) { + LOG.info("Invalid compacted file found ({}), deleting", compactedFile); + if (!compactedFile.delete()) { + LOG.warn("Couldn't delete invalid compacted file ({})", compactedFile); + } + continue; + } + File finalLogFile = new File(compactedFile.getParentFile(), compactionLogId + ".log"); + + compactionLogs.add( + new EntryLoggerCompactionEntryLog(compactionLogId, compactedLogId, + compactingLogFile, compactedFile, finalLogFile)); + } + } + } + return compactionLogs; + } + + private static File compactedLogFileFromCompacting(File compactionLogFile, long compactingLogId) { + File dir = compactionLogFile.getParentFile(); + String filename = compactionLogFile.getName(); + String newSuffix = ".log." + DefaultEntryLogger.logId2HexString(compactingLogId) + + TransactionalEntryLogCompactor.COMPACTED_SUFFIX; + String hardLinkFilename = filename.replace(TransactionalEntryLogCompactor.COMPACTING_SUFFIX, newSuffix); + return new File(dir, hardLinkFilename); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java index 98f4960549f..743cc9392cd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java @@ -27,7 +27,10 @@ import java.util.ArrayList; import java.util.List; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,9 +85,9 @@ public boolean compact(EntryLogMetadata entryLogMeta) { class CompactionScannerFactory { List offsets = new ArrayList(); - EntryLogger.EntryLogScanner newScanner(final EntryLogMetadata meta) { + EntryLogScanner newScanner(final EntryLogMetadata meta) { - return new EntryLogger.EntryLogScanner() { + return new EntryLogScanner() { @Override public boolean accept(long ledgerId) { return meta.containsLedger(ledgerId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java index 340e9a18b60..106e6eedef4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.List; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; interface EntryLogManager { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java index f9c6d97cfd8..363189a72e8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java @@ -21,7 +21,7 @@ package org.apache.bookkeeper.bookie; -import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID; +import static org.apache.bookkeeper.bookie.DefaultEntryLogger.UNASSIGNED_LEDGERID; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; @@ -31,8 +31,8 @@ import java.io.IOException; import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -41,14 +41,14 @@ abstract class EntryLogManagerBase implements EntryLogManager { volatile List rotatedLogChannels; final EntryLoggerAllocator entryLoggerAllocator; final LedgerDirsManager ledgerDirsManager; - private final List listeners; + private final List listeners; /** * The maximum size of a entry logger file. */ final long logSizeLimit; EntryLogManagerBase(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, - EntryLoggerAllocator entryLoggerAllocator, List listeners) { + EntryLoggerAllocator entryLoggerAllocator, List listeners) { this.ledgerDirsManager = ledgerDirsManager; this.entryLoggerAllocator = entryLoggerAllocator; this.listeners = listeners; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java index c04ccbe8daa..c70bfde9bfc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java @@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.Counter; @@ -259,7 +259,7 @@ ConcurrentMap getCounterMap() { */ private final ConcurrentLongHashMap replicaOfCurrentLogChannels; private final CacheLoader entryLogAndLockTupleCacheLoader; - private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; + private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; private final int entrylogMapAccessExpiryTimeInSeconds; private final int maximumNumberOfActiveEntryLogs; private final int entryLogPerLedgerCounterLimitsMultFactor; @@ -269,9 +269,10 @@ ConcurrentMap getCounterMap() { final EntryLogsPerLedgerCounter entryLogsPerLedgerCounter; EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, - EntryLoggerAllocator entryLoggerAllocator, List listeners, - EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, StatsLogger statsLogger) - throws IOException { + EntryLoggerAllocator entryLoggerAllocator, + List listeners, + DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, + StatsLogger statsLogger) throws IOException { super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; this.rotatedLogChannels = new CopyOnWriteArrayList(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java index f5b88237112..f387eeedcd5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java @@ -21,8 +21,8 @@ package org.apache.bookkeeper.bookie; -import static org.apache.bookkeeper.bookie.EntryLogger.INVALID_LID; -import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID; +import static org.apache.bookkeeper.bookie.DefaultEntryLogger.INVALID_LID; +import static org.apache.bookkeeper.bookie.DefaultEntryLogger.UNASSIGNED_LEDGERID; import io.netty.buffer.ByteBuf; import java.io.File; @@ -33,7 +33,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.IOUtils; @@ -44,11 +44,11 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase { private volatile BufferedLogChannel activeLogChannel; private long logIdBeforeFlush = INVALID_LID; private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false); - private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; + private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; EntryLogManagerForSingleEntryLog(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, - EntryLoggerAllocator entryLoggerAllocator, List listeners, - EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) { + EntryLoggerAllocator entryLoggerAllocator, List listeners, + DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) { super(conf, ledgerDirsManager, entryLoggerAllocator, listeners); this.rotatedLogChannels = new LinkedList(); this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus; @@ -157,7 +157,7 @@ public long getCurrentLogId() { if (currentActiveLogChannel != null) { return currentActiveLogChannel.getLogId(); } else { - return EntryLogger.UNINITIALIZED_LOG_ID; + return DefaultEntryLogger.UNINITIALIZED_LOG_ID; } } @@ -260,7 +260,7 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO } @Override - public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException { + public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException { return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java index 2066e6d4ef4..a2f61fd6ed5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; import org.apache.bookkeeper.conf.ServerConfiguration; /** @@ -61,14 +61,14 @@ class EntryLoggerAllocator { private final LedgerDirsManager ledgerDirsManager; private final Object createEntryLogLock = new Object(); private final Object createCompactionLogLock = new Object(); - private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; + private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus; private final boolean entryLogPreAllocationEnabled; private final ByteBufAllocator byteBufAllocator; - final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE); + final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE); EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, - EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, - ByteBufAllocator byteBufAllocator) { + DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, + ByteBufAllocator byteBufAllocator) { this.conf = conf; this.byteBufAllocator = byteBufAllocator; this.ledgerDirsManager = ledgerDirsManager; @@ -83,8 +83,8 @@ class EntryLoggerAllocator { // so there can be race conditions when entry logs are rolled over and // this header buffer is cleared before writing it into the new logChannel. logfileHeader.writeBytes("BKLO".getBytes(UTF_8)); - logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION); - logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE); + logfileHeader.writeInt(DefaultEntryLogger.HEADER_CURRENT_VERSION); + logfileHeader.writerIndex(DefaultEntryLogger.LOGFILE_HEADER_SIZE); } @@ -175,7 +175,7 @@ private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog, setLastLogId(f, preallocatedLogId); } - if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) { + if (suffix.equals(DefaultEntryLogger.LOG_FILE_SUFFIX)) { recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 87a947731a8..fba0bec90fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException; import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; import org.apache.bookkeeper.bookie.stats.GarbageCollectorStats; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -213,7 +214,8 @@ public void removeEntryLog(long logToRemove) { } }; if (conf.getUseTransactionalCompaction()) { - this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover); + this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, + ledgerDirsManager, remover); } else { this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover); } @@ -755,4 +757,4 @@ public GarbageCollectionStatus getGarbageCollectionStatus() { .minorCompactionCounter(gcStats.getMinorCompactionCounter().get()) .build(); } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index a1dc14b2f61..a071fd5f40d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -57,8 +57,9 @@ import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -88,7 +89,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener { private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class); - EntryLogger entryLogger; + DefaultEntryLogger entryLogger; @Getter LedgerCache ledgerCache; protected CheckpointSource checkpointSource = CheckpointSource.DEFAULT; @@ -158,7 +159,7 @@ void initializeWithEntryLogListener(ServerConfiguration conf, ledgerManager, ledgerDirsManager, indexDirsManager, - new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE), + new DefaultEntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE), allocator), statsLogger); } @@ -184,7 +185,7 @@ public void initializeWithEntryLogger(ServerConfiguration conf, StatsLogger statsLogger) throws IOException { checkNotNull(checkpointSource, "invalid null checkpoint source"); checkNotNull(checkpointer, "invalid null checkpointer"); - this.entryLogger = entryLogger; + this.entryLogger = (DefaultEntryLogger) entryLogger; this.entryLogger.addListener(this); ledgerCache = new LedgerCacheImpl(conf, activeLedgers, null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger); @@ -284,7 +285,7 @@ public void shutdown() throws InterruptedException { LOG.info("Shutting down GC thread"); gcThread.shutdown(); LOG.info("Shutting down entry logger"); - entryLogger.shutdown(); + entryLogger.close(); try { ledgerCache.close(); } catch (IOException e) { @@ -602,7 +603,7 @@ public List localConsistencyCheck(Optional r try { entryLogger.checkEntry(ledger, entry, offset); checkedEntries.increment(); - } catch (EntryLogger.EntryLookupException e) { + } catch (DefaultEntryLogger.EntryLookupException e) { if (version != lep.getVersion()) { pageRetries.increment(); if (lep.isDeleted()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java index ad80d4ac1af..46509dc23b0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java @@ -30,7 +30,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.checksum.DigestManager; @@ -94,7 +94,7 @@ public void initiate(boolean dryRun) throws IOException { conf, diskChecker, NullStatsLogger.INSTANCE); LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager( conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); final LedgerCache ledgerCache; if (dryRun) { ledgerCache = new DryRunLedgerCache(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java similarity index 86% rename from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java rename to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java index 2a683dcefa5..193a450d553 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java @@ -29,14 +29,14 @@ /** * Read Only Entry Logger. */ -public class ReadOnlyEntryLogger extends EntryLogger { +public class ReadOnlyDefaultEntryLogger extends DefaultEntryLogger { - public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException { + public ReadOnlyDefaultEntryLogger(ServerConfiguration conf) throws IOException { super(conf); } @Override - protected boolean removeEntryLog(long entryLogId) { + public boolean removeEntryLog(long entryLogId) { // can't remove entry log in readonly mode return false; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 428a1262714..89a429b7627 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -54,7 +55,7 @@ */ public class SortedLedgerStorage implements LedgerStorage, CacheCallback, SkipListFlusher, - CompactableLedgerStorage, EntryLogger.EntryLogListener { + CompactableLedgerStorage, DefaultEntryLogger.EntryLogListener { private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class); EntryMemTable memTable; @@ -260,7 +261,8 @@ public void cancelWaitForLastAddConfirmedUpdate(long ledgerId, @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { long numBytesFlushed = memTable.flush(this, checkpoint); - interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed); + ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()) + .prepareSortedLedgerStorageCheckpoint(numBytesFlushed); interleavedLedgerStorage.checkpoint(checkpoint); } @@ -315,9 +317,9 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { try { LOG.info("Started flushing mem table."); - interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush(); + ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).prepareEntryMemTableFlush(); memTable.flush(SortedLedgerStorage.this); - if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) { + if (((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).commitEntryMemTableFlush()) { interleavedLedgerStorage.checkpointer.startCheckpoint(cp); } } catch (Exception e) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index aec934fa159..7c7307c3714 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -23,14 +23,15 @@ import io.netty.buffer.ByteBuf; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLogger; + import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.util.HardLink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { final EntryLogger entryLogger; final CompactableLedgerStorage ledgerStorage; + final LedgerDirsManager ledgerDirsManager; final List offsets = new ArrayList<>(); // compaction log file suffix @@ -58,10 +60,12 @@ public TransactionalEntryLogCompactor( ServerConfiguration conf, EntryLogger entryLogger, CompactableLedgerStorage ledgerStorage, + LedgerDirsManager ledgerDirsManager, LogRemovalListener logRemover) { super(conf, logRemover); this.entryLogger = entryLogger; this.ledgerStorage = ledgerStorage; + this.ledgerDirsManager = ledgerDirsManager; } /** @@ -70,25 +74,10 @@ public TransactionalEntryLogCompactor( @Override public void cleanUpAndRecover() { // clean up compacting logs and recover index for already compacted logs - List ledgerDirs = entryLogger.getLedgerDirsManager().getAllLedgerDirs(); - for (File dir : ledgerDirs) { - File[] compactingPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTING_SUFFIX)); - if (compactingPhaseFiles != null) { - for (File file : compactingPhaseFiles) { - if (file.delete()) { - LOG.info("Deleted failed compaction file {}", file); - } - } - } - File[] compactedPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTED_SUFFIX)); - if (compactedPhaseFiles != null) { - for (File compactedFile : compactedPhaseFiles) { - LOG.info("Found compacted log file {} has partially flushed index, recovering index.", - compactedFile); - CompactionPhase updateIndex = new UpdateIndexPhase(compactedFile, true); - updateIndex.run(); - } - } + for (CompactionEntryLog log : entryLogger.incompleteCompactionLogs()) { + LOG.info("Found compacted log file {} has partially flushed index, recovering index.", log); + CompactionPhase updateIndex = new UpdateIndexPhase(log, true); + updateIndex.run(); } } @@ -97,19 +86,26 @@ public boolean compact(EntryLogMetadata metadata) { if (metadata != null) { LOG.info("Compacting entry log {} with usage {}.", metadata.getEntryLogId(), metadata.getUsage()); - CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + CompactionEntryLog compactionLog; + try { + compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId()); + } catch (IOException ioe) { + LOG.error("Exception creating new compaction entry log", ioe); + return false; + } + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog); if (!scanEntryLog.run()) { LOG.info("Compaction for entry log {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); return false; } - File compactionLogFile = entryLogger.getCurCompactionLogFile(); - CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId()); + + CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog); if (!flushCompactionLog.run()) { LOG.info("Compaction for entry log {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); return false; } - File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); - CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile); + + CompactionPhase updateIndex = new UpdateIndexPhase(compactionLog); if (!updateIndex.run()) { LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId()); return false; @@ -161,16 +157,17 @@ boolean run() { */ class ScanEntryLogPhase extends CompactionPhase { private final EntryLogMetadata metadata; + private final CompactionEntryLog compactionLog; - ScanEntryLogPhase(EntryLogMetadata metadata) { + ScanEntryLogPhase(EntryLogMetadata metadata, CompactionEntryLog compactionLog) { super("ScanEntryLogPhase"); this.metadata = metadata; + this.compactionLog = compactionLog; } @Override void start() throws IOException { // scan entry log into compaction log and offset list - entryLogger.createNewCompactionLog(); entryLogger.scanEntryLog(metadata.getEntryLogId(), new EntryLogScanner() { @Override public boolean accept(long ledgerId) { @@ -189,7 +186,7 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio ledgerId, lid, entryId, offset); throw new IOException("Invalid entry found @ offset " + offset); } - long newOffset = entryLogger.addEntryForCompaction(ledgerId, entry); + long newOffset = compactionLog.addEntry(ledgerId, entry); offsets.add(new EntryLocation(ledgerId, entryId, newOffset)); if (LOG.isDebugEnabled()) { @@ -207,7 +204,7 @@ boolean complete() { // no valid entries is compacted, delete entry log file LOG.info("No valid entry is found in entry log after scan, removing entry log now."); logRemovalListener.removeEntryLog(metadata.getEntryLogId()); - entryLogger.removeCurCompactionLog(); + compactionLog.abort(); return false; } return true; @@ -217,9 +214,8 @@ boolean complete() { void abort() { offsets.clear(); // since we haven't flushed yet, we only need to delete the unflushed compaction file. - entryLogger.removeCurCompactionLog(); + compactionLog.abort(); } - } /** @@ -230,51 +226,35 @@ void abort() { * a hardlink file "3.log.1.compacted" should be created, and "3.log.compacting" should be deleted. */ class FlushCompactionLogPhase extends CompactionPhase { - private final long compactingLogId; - private File compactedLogFile; + final CompactionEntryLog compactionLog; - FlushCompactionLogPhase(long compactingLogId) { + FlushCompactionLogPhase(CompactionEntryLog compactionLog) { super("FlushCompactionLogPhase"); - this.compactingLogId = compactingLogId; + this.compactionLog = compactionLog; } @Override void start() throws IOException { // flush the current compaction log. - File compactionLogFile = entryLogger.getCurCompactionLogFile(); - if (compactionLogFile == null || !compactionLogFile.exists()) { - throw new IOException("Compaction log doesn't exist during flushing"); - } - entryLogger.flushCompactionLog(); + compactionLog.flush(); } @Override boolean complete() throws IOException { - // create a hard link file named "x.log.y.compacted" for file "x.log.compacting". - // where x is compactionLogId and y is compactingLogId. - File compactionLogFile = entryLogger.getCurCompactionLogFile(); - if (compactionLogFile == null || !compactionLogFile.exists()) { - LOG.warn("Compaction log doesn't exist any more after flush"); + try { + compactionLog.markCompacted(); + return true; + } catch (IOException ioe) { + LOG.warn("Error marking compaction as done", ioe); return false; } - compactedLogFile = getCompactedLogFile(compactionLogFile, compactingLogId); - if (compactedLogFile != null && !compactedLogFile.exists()) { - HardLink.createHardLink(compactionLogFile, compactedLogFile); - } - entryLogger.removeCurCompactionLog(); - return true; } @Override void abort() { offsets.clear(); // remove compaction log file and its hardlink - entryLogger.removeCurCompactionLog(); - if (compactedLogFile != null && compactedLogFile.exists()) { - if (!compactedLogFile.delete()) { - LOG.warn("Could not delete compacted log file {}", compactedLogFile); - } - } + compactionLog.abort(); } } @@ -289,41 +269,29 @@ void abort() { *

This phase can also used to recover partially flushed index when we pass isInRecovery=true */ class UpdateIndexPhase extends CompactionPhase { - File compactedLogFile; - File newEntryLogFile; + final CompactionEntryLog compactionLog; private final boolean isInRecovery; - public UpdateIndexPhase(File compactedLogFile) { - this(compactedLogFile, false); + public UpdateIndexPhase(CompactionEntryLog compactionLog) { + this(compactionLog, false); } - public UpdateIndexPhase(File compactedLogFile, boolean isInRecovery) { + public UpdateIndexPhase(CompactionEntryLog compactionLog, boolean isInRecovery) { super("UpdateIndexPhase"); - this.compactedLogFile = compactedLogFile; + this.compactionLog = compactionLog; this.isInRecovery = isInRecovery; } @Override void start() throws IOException { - if (compactedLogFile != null && compactedLogFile.exists()) { - File dir = compactedLogFile.getParentFile(); - String compactedFilename = compactedLogFile.getName(); - // create a hard link "x.log" for file "x.log.y.compacted" - this.newEntryLogFile = new File(dir, compactedFilename.substring(0, - compactedFilename.indexOf(".log") + 4)); - if (!newEntryLogFile.exists()) { - HardLink.createHardLink(compactedLogFile, newEntryLogFile); - } - if (isInRecovery) { - recoverEntryLocations(EntryLogger.fileName2LogId(newEntryLogFile.getName())); - } - if (!offsets.isEmpty()) { - // update entry locations and flush index - ledgerStorage.updateEntriesLocations(offsets); - ledgerStorage.flushEntriesLocationsIndex(); - } - } else { - throw new IOException("Failed to find compacted log file in UpdateIndexPhase"); + compactionLog.makeAvailable(); + if (isInRecovery) { + recoverEntryLocations(compactionLog); + } + if (!offsets.isEmpty()) { + // update entry locations and flush index + ledgerStorage.updateEntriesLocations(offsets); + ledgerStorage.flushEntriesLocationsIndex(); } } @@ -332,16 +300,8 @@ boolean complete() { // When index is flushed, and entry log is removed, // delete the ".compacted" file to indicate this phase is completed. offsets.clear(); - if (compactedLogFile != null) { - if (!compactedLogFile.delete()) { - LOG.warn("Could not delete compacted log file {}", compactedLogFile); - } - // Now delete the old entry log file since it's compacted - String compactedFilename = compactedLogFile.getName(); - String oldEntryLogFilename = compactedFilename.substring(compactedFilename.indexOf(".log") + 5); - long entryLogId = EntryLogger.fileName2LogId(oldEntryLogFilename); - logRemovalListener.removeEntryLog(entryLogId); - } + compactionLog.cleanup(); + logRemovalListener.removeEntryLog(compactionLog.getCompactedLogId()); return true; } @@ -353,8 +313,8 @@ void abort() { /** * Scan entry log to recover entry locations. */ - private void recoverEntryLocations(long compactedLogId) throws IOException { - entryLogger.scanEntryLog(compactedLogId, new EntryLogScanner() { + private void recoverEntryLocations(CompactionEntryLog compactionLog) throws IOException { + compactionLog.scan(new EntryLogScanner() { @Override public boolean accept(long ledgerId) { return true; @@ -370,23 +330,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio ledgerId, lid, entryId, offset); throw new IOException("Invalid entry found @ offset " + offset); } - long location = (compactedLogId << 32L) | (offset + 4); + long location = (compactionLog.getLogId() << 32L) | (offset + 4); offsets.add(new EntryLocation(lid, entryId, location)); } }); - LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactedLogId); + LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getLogId()); } } - - File getCompactedLogFile(File compactionLogFile, long compactingLogId) { - if (compactionLogFile == null) { - return null; - } - File dir = compactionLogFile.getParentFile(); - String filename = compactionLogFile.getName(); - String newSuffix = ".log." + EntryLogger.logId2HexString(compactingLogId) + COMPACTED_SUFFIX; - String hardLinkFilename = filename.replace(COMPACTING_SUFFIX, newSuffix); - return new File(dir, hardLinkFilename); - } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java new file mode 100644 index 00000000000..60806add86d --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java @@ -0,0 +1,91 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; + +/** + * An entrylog to received compacted entries. + *

+ * The expected lifecycle for a compaction entry log is: + * 1. Creation + * 2. Mark compacted + * 3. Make available + * 4. Cleanup + *

+ * Abort can happen at during any step. + */ +public interface CompactionEntryLog { + /** + * Add an entry to the log. + * @param ledgerId the ledger the entry belong to + * @param entry the payload of the entry + * @return the position to which the entry was written + */ + long addEntry(long ledgerId, ByteBuf entry) throws IOException; + + /** + * Scan the entry log, reading out all contained entries. + */ + void scan(EntryLogScanner scanner) throws IOException; + + /** + * Flush any unwritten entries to physical storage. + */ + void flush() throws IOException; + + /** + * Abort the compaction log. This should delete any resources held + * by this log. + */ + void abort(); + + /** + * Mark the compaction log as compacted. + * From this point, the heavy work of copying entries from one log + * to another should be done. We don't want to repeat that work, + * so this method should take steps to ensure that if the bookie crashes + * we can resume the compaction from this point. + */ + void markCompacted() throws IOException; + + /** + * Make the log written by the compaction process available for reads. + */ + void makeAvailable() throws IOException; + + /** + * Clean up any temporary resources that were used by the compaction process. + * At this point, there + */ + void cleanup(); + + /** + * Get the log ID of the entrylog to which compacted entries are being written. + */ + long getLogId(); + + /** + * Get the log ID of the entrylog which is being compacted. + */ + long getCompactedLogId(); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java new file mode 100644 index 00000000000..bcca2b19889 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; + +/** + * Scan entries in a entry log file. + */ +public interface EntryLogScanner { + /** + * Tests whether or not the entries belongs to the specified ledger + * should be processed. + * + * @param ledgerId + * Ledger ID. + * @return true if and only the entries of the ledger should be scanned. + */ + boolean accept(long ledgerId); + + /** + * Process an entry. + * + * @param ledgerId + * Ledger ID. + * @param offset + * File offset of this entry. + * @param entry + * Entry ByteBuf + * @throws IOException + */ + void process(long ledgerId, long offset, ByteBuf entry) throws IOException; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java new file mode 100644 index 00000000000..cab72cd587c --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java @@ -0,0 +1,149 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.Collection; +import org.apache.bookkeeper.bookie.AbstractLogCompactor; +import org.apache.bookkeeper.bookie.Bookie.NoEntryException; +import org.apache.bookkeeper.bookie.EntryLogMetadata; + + +/** + * Entry logger. Sequentially writes entries for a large number of ledgers to + * a small number of log files, to avoid many random writes. + * When an entry is added, a location is returned, which consists of the ID of the + * log into which the entry was added, and the offset of that entry within the log. + * The location is a long, with 32 bits each for the log ID and the offset. This + * naturally limits the offset and thus the size of the log to Integer.MAX_VALUE. + */ +public interface EntryLogger extends AutoCloseable { + long UNASSIGNED_LEDGERID = -1L; + // log file suffix + String LOG_FILE_SUFFIX = ".log"; + + /** + * Add an entry for ledger ```ledgerId``` to the entrylog. + * @param ledgerId the ledger for which the entry is being added + * @param buf the contents of the entry (this method does not take ownership of the refcount) + * @return the location in the entry log of the added entry + */ + long addEntry(long ledgerId, ByteBuf buf) throws IOException; + + /** + * Read an entry from an entrylog location. + * @param entryLocation the location from which to read the entry + * @return the entry + */ + ByteBuf readEntry(long entryLocation) + throws IOException, NoEntryException; + /** + * Read an entry from an entrylog location, and verify that is matches the + * expected ledger and entry ID. + * @param ledgerId the ledgerID to match + * @param entryId the entryID to match + * @param entryLocation the location from which to read the entry + * @return the entry + */ + ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) + throws IOException, NoEntryException; + + /** + * Flush any outstanding writes to disk. + */ + void flush() throws IOException; + + @Override + void close() throws IOException; + + /** + * Create a new entrylog into which compacted entries can be added. + * There is a 1-1 mapping between logs that are being compacted + * and the log the compacted entries are written to. + */ + CompactionEntryLog newCompactionLog(long logToCompact) throws IOException; + + /** + * Return a collection of all the compaction entry logs which have been + * compacted, but have not been cleaned up. + */ + Collection incompleteCompactionLogs(); + + /** + * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector + * process needs to look beyond the least unflushed entry log file, as there may be entry logs + * ready to be garbage collected. + * + * @return last entry log id created. + */ + long getLastLogId(); + + /** + * Get the lowest entrylog ID which has not had all its data persisted to + * disk. + * This is used by compaction. Any entrylog ID higher or equal to this value + * will not be considered for compaction. + */ + long getLeastUnflushedLogId(); + + /** + * Scan the given entrylog, returning all entries contained therein. + */ + void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException; + + /** + * Retrieve metadata for the given entrylog ID. + * The metadata contains the size of the log, the size of the data in the log which is still + * active, and a list of all the ledgers contained in the log and the size of the data stored + * for each ledger. + */ + default EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException { + return getEntryLogMetadata(entryLogId, null); + } + + /** + * Retrieve metadata for the given entrylog ID. + * The metadata contains the size of the log, the size of the data in the log which is still + * active, and a list of all the ledgers contained in the log and the size of the data stored + * for each ledger. + */ + EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler) throws IOException; + + /** + * Check whether an entrylog with the given ID exists. + */ + boolean logExists(long logId); + + /** + * Returns whether the current log id exists and has been rotated already. + * + * @param entryLogId EntryLog id to check. + * @return Whether the given entryLogId exists and has been rotated. + */ + boolean isFlushedEntryLog(Long entryLogId); + + /** + * Delete the entrylog with the given ID. + * @return false if the entrylog doesn't exist. + */ + boolean removeEntryLog(long entryLogId); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index 6e481ca5695..1f5cf8de4ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -37,10 +37,10 @@ import java.util.List; import java.util.Set; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.Journal; import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -136,7 +136,7 @@ public boolean initiate() { } private void scanEntryLogFiles(Set ledgers) throws IOException { - EntryLogger entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); Set entryLogs = entryLogger.getEntryLogsSet(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 55c5c90c31e..5d53c66081a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -34,9 +34,9 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -69,7 +69,7 @@ public void initiate() throws IOException { long startTime = System.nanoTime(); - EntryLogger entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); Set entryLogs = entryLogger.getEntryLogsSet(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 753b88f7ecc..63f2f5a1b7e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -58,8 +58,8 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.GarbageCollectorThread; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; @@ -68,6 +68,7 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerEntryPage; import org.apache.bookkeeper.bookie.StateManager; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; import org.apache.bookkeeper.common.util.Watcher; @@ -185,7 +186,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES); - entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator); + entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator); gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, statsLogger); dbLedgerStorageStats = new DbLedgerStorageStats( @@ -265,7 +266,7 @@ public void shutdown() throws InterruptedException { flush(); gcThread.shutdown(); - entryLogger.shutdown(); + entryLogger.close(); cleanupExecutor.shutdown(); cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS); @@ -589,8 +590,8 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi while (count < readAheadCacheBatchSize && size < maxReadAheadBytesSize && currentEntryLogId == firstEntryLogId) { - ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation, - false /* validateEntry */); + ByteBuf entry = entryLogger.readEntry(orginalLedgerId, + firstEntryId, currentEntryLocation); try { long currentEntryLedgerId = entry.getLong(0); @@ -722,7 +723,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { Batch batch = entryLocationIndex.newBatch(); writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> { try { - long location = entryLogger.addEntry(ledgerId, entry, true); + long location = entryLogger.addEntry(ledgerId, entry); entryLocationIndex.addLocation(batch, ledgerId, entryId, location); } catch (IOException e) { throw new RuntimeException(e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java new file mode 100644 index 00000000000..f8744532700 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Provides a Bookie server that stores entries for clients. + */ +package org.apache.bookkeeper.bookie.storage; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java index 5662d3d6d51..fbe230cbcd2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java @@ -35,9 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger; import lombok.Setter; import lombok.experimental.Accessors; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLogMetadata; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger; +import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -133,7 +133,7 @@ public void handler(ServerConfiguration bkConf, ActiveLedgerFlags cmdFlags) BKException.Code.OK, BKException.Code.ReadException); if (done.await(cmdFlags.timeout, TimeUnit.MILLISECONDS)){ if (resultCode.get() == BKException.Code.OK) { - EntryLogger entryLogger = new ReadOnlyEntryLogger(bkConf); + DefaultEntryLogger entryLogger = new ReadOnlyDefaultEntryLogger(bkConf); EntryLogMetadata entryLogMetadata = entryLogger.getEntryLogMetadata(cmdFlags.logId); List ledgersOnEntryLog = entryLogMetadata.getLedgersMap().keys(); if (ledgersOnEntryLog.size() == 0) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java index 11311936ad6..edef6609ff2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java @@ -25,8 +25,9 @@ import java.io.IOException; import lombok.Setter; import lombok.experimental.Accessors; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger; +import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; import org.apache.bookkeeper.tools.framework.CliFlags; @@ -174,7 +175,7 @@ private void scanEntryLogForPositionRange(ServerConfiguration conf, long logId, LOG.info("Scan entry log {} ({}.log) for PositionRange: {} - {}", logId, Long.toHexString(logId), rangeStartPos, rangeEndPos); final MutableBoolean entryFound = new MutableBoolean(false); - scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() { + scanEntryLog(conf, logId, new EntryLogScanner() { private MutableBoolean stopScanning = new MutableBoolean(false); @Override @@ -220,7 +221,7 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry) throws IOE * @param logId Entry Log Id * @param scanner Entry Log Scanner */ - private void scanEntryLog(ServerConfiguration conf, long logId, EntryLogger.EntryLogScanner scanner) + private void scanEntryLog(ServerConfiguration conf, long logId, EntryLogScanner scanner) throws IOException { initEntryLogger(conf); entryLogger.scanEntryLog(logId, scanner); @@ -229,7 +230,7 @@ private void scanEntryLog(ServerConfiguration conf, long logId, EntryLogger.Entr private synchronized void initEntryLogger(ServerConfiguration conf) throws IOException { if (null == entryLogger) { // provide read only entry logger - entryLogger = new ReadOnlyEntryLogger(conf); + entryLogger = new ReadOnlyDefaultEntryLogger(conf); } } @@ -248,7 +249,7 @@ private void scanEntryLogForSpecificEntry(ServerConfiguration conf, long logId, LOG.info("Scan entry log {} ({}.log) for LedgerId {} {}", logId, Long.toHexString(logId), ledgerId, ((entryId == -1) ? "" : " for EntryId " + entryId)); final MutableBoolean entryFound = new MutableBoolean(false); - scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() { + scanEntryLog(conf, logId, new EntryLogScanner() { @Override public boolean accept(long candidateLedgerId) { return ((candidateLedgerId == ledgerId) && ((!entryFound.booleanValue()) || (entryId == -1))); @@ -281,7 +282,7 @@ public void process(long candidateLedgerId, long startPos, ByteBuf entry) { */ private void scanEntryLog(ServerConfiguration conf, long logId, final boolean printMsg) throws Exception { LOG.info("Scan entry log {} ({}.log)", logId, Long.toHexString(logId)); - scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() { + scanEntryLog(conf, logId, new EntryLogScanner() { @Override public boolean accept(long ledgerId) { return true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java index 987015cfca6..b3a88af7e05 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java @@ -26,8 +26,8 @@ import lombok.Setter; import lombok.experimental.Accessors; import org.apache.bookkeeper.bookie.EntryLogMetadata; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger; +import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogMetadataCommand.ReadLogMetadataFlags; import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; @@ -144,7 +144,7 @@ private void printEntryLogMetadata(ServerConfiguration conf, long logId) throws private synchronized void initEntryLogger(ServerConfiguration conf) throws IOException { // provide read only entry logger if (null == entryLogger) { - entryLogger = new ReadOnlyEntryLogger(conf); + entryLogger = new ReadOnlyDefaultEntryLogger(conf); } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 765d8256a7f..9005fef748b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -35,6 +35,7 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.UncheckedExecutionException; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; @@ -57,6 +58,7 @@ import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -74,16 +76,18 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.util.DiskChecker; -import org.apache.bookkeeper.util.HardLink; import org.apache.bookkeeper.util.TestUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.AsyncCallback; + import org.junit.Before; import org.junit.Test; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * This class tests the entry log compaction functionality. */ @@ -742,7 +746,7 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception { // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm // should consider it for deletion. - getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L); + ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L); getGCThread().triggerGC(true, false, false).get(); assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: " + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 1)); @@ -752,7 +756,7 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception { getGCThread().triggerGC(true, false, false).get(); assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: " + tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0)); - getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L); + ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L); getGCThread().triggerGC(true, false, false).get(); assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: " + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 0)); @@ -1285,7 +1289,7 @@ public void testCompactionSafety() throws Exception { BookieImpl.checkDirectoryStructure(curDir); conf.setLedgerDirNames(new String[] {tmpDir.toString()}); - conf.setEntryLogSizeLimit(EntryLogger.LOGFILE_HEADER_SIZE + 3 * (4 + ENTRY_SIZE)); + conf.setEntryLogSizeLimit(DefaultEntryLogger.LOGFILE_HEADER_SIZE + 3 * (4 + ENTRY_SIZE)); conf.setGcWaitTime(100); conf.setMinorCompactionThreshold(0.7f); conf.setMajorCompactionThreshold(0.0f); @@ -1881,30 +1885,31 @@ public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) { super(gcThread.conf, gcThread.entryLogger, gcThread.ledgerStorage, - (long entry) -> { - try { - gcThread.removeEntryLog(entry); - } catch (EntryLogMetadataMapException e) { - LOG.warn("Failed to remove entry-log metadata {}", entry, e); - } - }); + gcThread.ledgerDirsManager, + (long entry) -> { + try { + gcThread.removeEntryLog(entry); + } catch (EntryLogMetadataMapException e) { + LOG.warn("Failed to remove entry-log metadata {}", entry, e); + } + }); } - synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) { + synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) throws IOException { LOG.info("Compacting entry log {}.", metadata.getEntryLogId()); - CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId()); + + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog); if (!scanEntryLog.run()) { LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); return; } - File compactionLogFile = entryLogger.getCurCompactionLogFile(); - CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId()); + CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog); if (!flushCompactionLog.run()) { LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); return; } - File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); - CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactedLogFile); + CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactionLog); if (!partialFlushIndexPhase.run()) { LOG.info("Compaction for {} end in PartialFlushIndexPhase.", metadata.getEntryLogId()); return; @@ -1913,21 +1918,21 @@ synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) { LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); } - synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) { + synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) throws IOException { LOG.info("Compacting entry log {}", metadata.getEntryLogId()); - CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId()); + + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog); if (!scanEntryLog.run()) { LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); return; } - File compactionLogFile = entryLogger.getCurCompactionLogFile(); - CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(metadata.getEntryLogId()); + CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(compactionLog); if (!logFlushFailurePhase.run()) { LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); return; } - File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); - CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile); + CompactionPhase updateIndex = new UpdateIndexPhase(compactionLog); if (!updateIndex.run()) { LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId()); return; @@ -1938,45 +1943,35 @@ synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) { private class PartialFlushIndexPhase extends UpdateIndexPhase { - public PartialFlushIndexPhase(File compactedLogFile) { - super(compactedLogFile); + public PartialFlushIndexPhase(CompactionEntryLog compactionLog) { + super(compactionLog); } @Override void start() throws IOException { - if (compactedLogFile != null && compactedLogFile.exists()) { - File dir = compactedLogFile.getParentFile(); - String compactedFilename = compactedLogFile.getName(); - // create a hard link "x.log" for file "x.log.y.compacted" - this.newEntryLogFile = new File(dir, compactedFilename.substring(0, - compactedFilename.indexOf(".log") + 4)); - File hardlinkFile = new File(dir, newEntryLogFile.getName()); - if (!hardlinkFile.exists()) { - HardLink.createHardLink(compactedLogFile, hardlinkFile); - } - assertTrue(offsets.size() > 1); - // only flush index for one entry location - EntryLocation el = offsets.get(0); - ledgerStorage.updateEntriesLocations(offsets); - ledgerStorage.flushEntriesLocationsIndex(); - throw new IOException("Flush ledger index encounter exception"); - } + compactionLog.makeAvailable(); + assertTrue(offsets.size() > 1); + // only flush index for one entry location + EntryLocation el = offsets.get(0); + ledgerStorage.updateEntriesLocations(offsets); + ledgerStorage.flushEntriesLocationsIndex(); + throw new IOException("Flush ledger index encounter exception"); } } private class LogFlushFailurePhase extends FlushCompactionLogPhase { - LogFlushFailurePhase(long compactingLogId) { - super(compactingLogId); + LogFlushFailurePhase(CompactionEntryLog compactionEntryLog) { + super(compactionEntryLog); } @Override void start() throws IOException { // flush the current compaction log - entryLogger.flushCompactionLog(); + compactionLog.flush(); throw new IOException("Encounter IOException when trying to flush compaction log"); } } } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java index 215e15942da..b2699e5e57e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java @@ -40,8 +40,6 @@ import java.util.stream.IntStream; import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; -import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.stats.Counter; @@ -122,7 +120,7 @@ public void testCreateNewLog() throws Exception { File newLogFile = new File(dir, logFileName); newLogFile.createNewFile(); - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager(); @@ -154,7 +152,7 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception { ledgerDirsManager.addToFilledDirs(tdir); } - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); // Calls createNewLog, and with the number of directories we // are using, if it picks one at random it will fail. EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager(); @@ -191,7 +189,7 @@ public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception { conf.setEntryLogPerLedgerEnabled(true); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator; EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -244,7 +242,7 @@ public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception { expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId()); Assert.assertEquals("Number of current ", numOfLedgers, entryLogManager.getCopyOfCurrentLogs().size()); - List rotatedLogChannels = entryLogManager.getRotatedLogChannels(); + List rotatedLogChannels = entryLogManager.getRotatedLogChannels(); Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size()); Assert.assertEquals("Rotated logchannel logid", rotatedLedger, rotatedLogChannels.iterator().next().getLogId()); entryLogger.flush(); @@ -299,7 +297,7 @@ public void testEntryLogCreationWithFilledDirs() throws Exception { conf.setEntryLogPerLedgerEnabled(true); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator; EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -332,7 +330,7 @@ public void testEntryLogCreationWithFilledDirs() throws Exception { File nonFilledLedgerDir = BookieImpl.getCurrentDirectory(new File(ledgerDirs[numDirs - 1])); entryLogManager.createNewLog(ledgerId); - BufferedLogChannel newLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId); + DefaultEntryLogger.BufferedLogChannel newLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId); Assert.assertEquals("Directory of newly created BufferedLogChannel file", nonFilledLedgerDir.getAbsolutePath(), newLogChannel.getLogFile().getParentFile().getAbsolutePath()); @@ -357,7 +355,7 @@ public void testLedgerDirsUniformityDuringCreation() throws Exception { conf.setEntryLogPerLedgerEnabled(true); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -422,7 +420,7 @@ public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled) LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager(); // set same thread executor for entryLoggerAllocator's allocatorExecutor setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator()); @@ -467,7 +465,7 @@ public void testCreateNewLogWithGaps() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) el.getEntryLogManager(); entryLogManagerBase.createNewLog(0L); @@ -506,7 +504,7 @@ public void testCreateNewLogAndCompactionLog() throws Exception { conf.setEntryLogFilePreAllocationEnabled(true); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); // set same thread executor for entryLoggerAllocator's allocatorExecutor setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator()); AtomicBoolean receivedException = new AtomicBoolean(false); @@ -516,7 +514,7 @@ public void testCreateNewLogAndCompactionLog() throws Exception { if (i % 2 == 0) { ((EntryLogManagerBase) el.getEntryLogManager()).createNewLog((long) i); } else { - el.createNewCompactionLog(); + el.newCompactionLog(i); } } catch (IOException e) { LOG.error("Received exception while creating newLog", e); @@ -544,7 +542,7 @@ public void testConcurrentEntryLogCreations() throws Exception { conf.setEntryLogPerLedgerEnabled(true); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -609,7 +607,7 @@ public void testEntryLogManagerMetrics() throws Exception { conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, UnpooledByteBufAllocator.DEFAULT); EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -735,7 +733,7 @@ public void testEntryLogManagerMetricsFromExpiryAspect() throws Exception { conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, UnpooledByteBufAllocator.DEFAULT); EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -814,7 +812,7 @@ public List getWritableLedgerDirsForNewLog() throws NoWritableLedgerDirExc } }; - EntryLogger el = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) el .getEntryLogManager(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java similarity index 93% rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java rename to bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index e3627f40abc..0898748b1ea 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.locks.Lock; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; +import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.common.testing.annotations.FlakyTest; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -78,8 +78,8 @@ * Tests for EntryLog. */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class EntryLogTest { - private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class); +public class DefaultEntryLogTest { + private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogTest.class); final List tempDirs = new ArrayList(); final Random rand = new Random(); @@ -94,7 +94,7 @@ File createTempDir(String prefix, String suffix) throws IOException { private File curDir; private ServerConfiguration conf; private LedgerDirsManager dirsMgr; - private EntryLogger entryLogger; + private DefaultEntryLogger entryLogger; @Before public void setUp() throws Exception { @@ -108,13 +108,13 @@ public void setUp() throws Exception { new DiskChecker( conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - this.entryLogger = new EntryLogger(conf, dirsMgr); + this.entryLogger = new DefaultEntryLogger(conf, dirsMgr); } @After public void tearDown() throws Exception { if (null != this.entryLogger) { - entryLogger.shutdown(); + entryLogger.close(); } for (File dir : tempDirs) { @@ -125,7 +125,7 @@ public void tearDown() throws Exception { @Test public void testDeferCreateNewLog() throws Exception { - entryLogger.shutdown(); + entryLogger.close(); // mark `curDir` as filled this.conf.setMinUsableSizeForEntryLogCreation(1); @@ -137,10 +137,10 @@ public void testDeferCreateNewLog() throws Exception { conf.getDiskUsageWarnThreshold())); this.dirsMgr.addToFilledDirs(curDir); - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager(); - assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); + assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); // add the first entry will trigger file creation entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); @@ -149,7 +149,7 @@ public void testDeferCreateNewLog() throws Exception { @Test public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception { - entryLogger.shutdown(); + entryLogger.close(); // mark `curDir` as filled this.conf.setMinUsableSizeForEntryLogCreation(Long.MAX_VALUE); @@ -161,17 +161,17 @@ public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception { conf.getDiskUsageWarnThreshold())); this.dirsMgr.addToFilledDirs(curDir); - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager(); - assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); + assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); // add the first entry will trigger file creation try { entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); fail("Should fail to append entry if there is no enough reserved space left"); } catch (NoWritableLedgerDirException e) { - assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); + assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId()); } } @@ -182,14 +182,14 @@ public void testCorruptEntryLog() throws Exception { entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer()); entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer()); entryLogger.flush(); - entryLogger.shutdown(); + entryLogger.close(); // now lets truncate the file to corrupt the last entry, which simulates a partial write File f = new File(curDir, "0.log"); RandomAccessFile raf = new RandomAccessFile(f, "rw"); raf.setLength(raf.length() - 10); raf.close(); // now see which ledgers are in the log - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); EntryLogMetadata meta = entryLogger.getEntryLogMetadata(0L); LOG.info("Extracted Meta From Entry Log {}", meta); @@ -230,12 +230,12 @@ public void testMissingLogId() throws Exception { for (int i = 0; i < numLogs; i++) { positions[i] = new long[numEntries]; - EntryLogger logger = new EntryLogger(conf, dirsMgr); + DefaultEntryLogger logger = new DefaultEntryLogger(conf, dirsMgr); for (int j = 0; j < numEntries; j++) { positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer()); } logger.flush(); - logger.shutdown(); + logger.close(); } // delete last log id File lastLogId = new File(curDir, "lastId"); @@ -245,15 +245,15 @@ public void testMissingLogId() throws Exception { for (int i = numLogs; i < 2 * numLogs; i++) { positions[i] = new long[numEntries]; - EntryLogger logger = new EntryLogger(conf, dirsMgr); + DefaultEntryLogger logger = new DefaultEntryLogger(conf, dirsMgr); for (int j = 0; j < numEntries; j++) { positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer()); } logger.flush(); - logger.shutdown(); + logger.close(); } - EntryLogger newLogger = new EntryLogger(conf, dirsMgr); + DefaultEntryLogger newLogger = new DefaultEntryLogger(conf, dirsMgr); for (int i = 0; i < (2 * numLogs + 1); i++) { File logFile = new File(curDir, Long.toHexString(i) + ".log"); assertTrue(logFile.exists()); @@ -281,9 +281,9 @@ public void testMissingLogId() throws Exception { public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist() throws Exception { File tmpDir = createTempDir("bkTest", ".dir"); - EntryLogger entryLogger = null; + DefaultEntryLogger entryLogger = null; try { - entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir }, + entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir }, new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); fail("Expecting FileNotFoundException"); } catch (FileNotFoundException e) { @@ -291,7 +291,7 @@ public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist() .getLocalizedMessage()); } finally { if (entryLogger != null) { - entryLogger.shutdown(); + entryLogger.close(); } } } @@ -309,7 +309,7 @@ public void testAddEntryFailureOnDiskFull() throws Exception { conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() }); BookieImpl bookie = new TestBookieImpl(conf); - EntryLogger entryLogger = new EntryLogger(conf, + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, bookie.getLedgerDirsManager()); InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()); @@ -323,7 +323,7 @@ public void testAddEntryFailureOnDiskFull() throws Exception { ledgerStorage.addEntry(generateEntry(2, 1)); // Add entry with disk full failure simulation bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager()) - .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile()); + .getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile()); ledgerStorage.addEntry(generateEntry(3, 1)); // Verify written entries Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1))); @@ -343,7 +343,7 @@ public void testRecoverFromLedgersMap() throws Exception { entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer()); EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); - entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); + entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); entryLogManager.flushRotatedLogs(); EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L); @@ -366,19 +366,19 @@ public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception { entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer()); entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer()); entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer()); - ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID); - entryLogger.shutdown(); + ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); + entryLogger.close(); // Rewrite the entry log header to be on V0 format File f = new File(curDir, "0.log"); RandomAccessFile raf = new RandomAccessFile(f, "rw"); - raf.seek(EntryLogger.HEADER_VERSION_POSITION); + raf.seek(DefaultEntryLogger.HEADER_VERSION_POSITION); // Write zeros to indicate V0 + no ledgers map info raf.write(new byte[4 + 8]); raf.close(); // now see which ledgers are in the log - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); try { entryLogger.extractEntryLogMetadataFromIndex(0L); @@ -404,25 +404,25 @@ public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception { */ @Test public void testPreAllocateLog() throws Exception { - entryLogger.shutdown(); + entryLogger.close(); // enable pre-allocation case conf.setEntryLogFilePreAllocationEnabled(true); - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); // create a logger whose initialization phase allocating a new entry log - ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID); + ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer()); // the Future is not null all the time assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); - entryLogger.shutdown(); + entryLogger.close(); // disable pre-allocation case conf.setEntryLogFilePreAllocationEnabled(false); // create a logger - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture()); entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer()); @@ -440,13 +440,13 @@ public void testGetEntryLogsSet() throws Exception { EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet()); - entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); + entryLogManagerBase.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); entryLogManagerBase.flushRotatedLogs(); Thread.sleep(2000); assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet()); - entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); + entryLogManagerBase.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); entryLogManagerBase.flushRotatedLogs(); assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet()); @@ -462,7 +462,7 @@ public void testGetEntryLogsSet() throws Exception { */ @Test public void testFlushOrder() throws Exception { - entryLogger.shutdown(); + entryLogger.close(); int logSizeLimit = 256 * 1024; conf.setEntryLogPerLedgerEnabled(false); @@ -470,7 +470,7 @@ public void testFlushOrder() throws Exception { conf.setFlushIntervalInBytes(0); conf.setEntryLogSizeLimit(logSizeLimit); - entryLogger = new EntryLogger(conf, dirsMgr); + entryLogger = new DefaultEntryLogger(conf, dirsMgr); EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); AtomicBoolean exceptionHappened = new AtomicBoolean(false); @@ -487,7 +487,7 @@ public void testFlushOrder() throws Exception { addEntriesAndRotateLogs(entryLogger, 30); rotatedLogChannels = new LinkedList(entryLogManager.getRotatedLogChannels()); - currentActiveChannel = entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID); + currentActiveChannel = entryLogManager.getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID); long currentActiveChannelUnpersistedBytes = currentActiveChannel.getUnpersistedBytes(); Thread flushThread = new Thread(new Runnable() { @@ -512,7 +512,7 @@ public void run() { * here we are adding entry of size logSizeLimit with * rolllog=true, so it would create a new entrylog. */ - entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit), true); + entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit)); } catch (InterruptedException | BrokenBarrierException | IOException e) { LOG.error("Exception happened for entryLogManager.createNewLog", e); exceptionHappened.set(true); @@ -551,18 +551,18 @@ public void run() { } } - void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations) + void addEntriesAndRotateLogs(DefaultEntryLogger entryLogger, int numOfRotations) throws IOException { EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); - entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null); + entryLogManager.setCurrentLogForLedgerAndAddToRotate(DefaultEntryLogger.UNASSIGNED_LEDGERID, null); for (int i = 0; i < numOfRotations; i++) { addEntries(entryLogger, 10); - entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null); + entryLogManager.setCurrentLogForLedgerAndAddToRotate(DefaultEntryLogger.UNASSIGNED_LEDGERID, null); } addEntries(entryLogger, 10); } - void addEntries(EntryLogger entryLogger, int noOfEntries) throws IOException { + void addEntries(DefaultEntryLogger entryLogger, int noOfEntries) throws IOException { for (int j = 0; j < noOfEntries; j++) { int ledgerId = Math.abs(rand.nextInt()); int entryId = Math.abs(rand.nextInt()); @@ -780,8 +780,8 @@ public void testEntryLoggersRecentEntryLogsStatus() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); - EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus; + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus; recentlyCreatedLogsStatus.createdEntryLog(0L); Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId()); @@ -842,7 +842,7 @@ public void testFlushIntervalInBytes() throws Exception { conf.setLedgerDirNames(createAndGetLedgerDirs(2)); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); /* @@ -852,7 +852,7 @@ public void testFlushIntervalInBytes() throws Exception { int firstEntrySize = 1000; long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize)); // entrylogger writes length of the entry (4 bytes) before writing entry - long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4; + long expectedUnpersistedBytes = DefaultEntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4; Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes, entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes()); @@ -869,7 +869,7 @@ public void testFlushIntervalInBytes() throws Exception { * newEntryLogger */ conf.setEntryLogPerLedgerEnabled(false); - EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger newEntryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager(); Assert.assertEquals("EntryLogManager class type", EntryLogManagerForSingleEntryLog.class, newEntryLogManager.getClass()); @@ -900,7 +900,7 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -969,12 +969,15 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception } } - private EntryLogger.BufferedLogChannel createDummyBufferedLogChannel(EntryLogger entryLogger, long logid, - ServerConfiguration servConf) throws IOException { + private DefaultEntryLogger.BufferedLogChannel createDummyBufferedLogChannel(DefaultEntryLogger entryLogger, + long logid, + ServerConfiguration servConf) + throws IOException { File tmpFile = File.createTempFile("entrylog", logid + ""); tmpFile.deleteOnExit(); FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel(); - EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10, + DefaultEntryLogger.BufferedLogChannel logChannel = + new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10, logid, tmpFile, servConf.getFlushIntervalInBytes()); return logChannel; } @@ -1050,7 +1053,7 @@ public void testEntryLogManagerExpiryRemoval() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -1087,7 +1090,7 @@ public void testEntryLogManagerExpiryRemoval() throws Exception { */ @Test public void testCacheMaximumSizeEvictionPolicy() throws Exception { - entryLogger.shutdown(); + entryLogger.close(); final int cacheMaximumSize = 20; ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); @@ -1098,7 +1101,7 @@ public void testCacheMaximumSizeEvictionPolicy() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - entryLogger = new EntryLogger(conf, ledgerDirsManager); + entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -1121,7 +1124,7 @@ public void testLongLedgerIdsWithEntryLogPerLedger() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -1173,7 +1176,7 @@ public void testAppendLedgersMapOnCacheRemoval() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -1235,7 +1238,7 @@ public void testExpiryRemovalByAccessingOnAnotherThread() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -1289,7 +1292,7 @@ public void testExpiryRemovalByAccessingNonCacheRelatedMethods() throws Exceptio LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); @@ -1357,7 +1360,7 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { conf.setLedgerDirNames(createAndGetLedgerDirs(2)); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class, entryLogManager.getClass()); @@ -1374,7 +1377,7 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { for (long i = 0; i < numOfActiveLedgers; i++) { BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE", - logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE); + logChannel.getUnpersistedBytes() > DefaultEntryLogger.LOGFILE_HEADER_SIZE); } for (long i = 0; i < numOfActiveLedgers; i++) { @@ -1395,8 +1398,8 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { */ for (long i = 0; i < numOfActiveLedgers; i++) { BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); - Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE, - logChannel.getUnpersistedBytes()); + Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", + DefaultEntryLogger.LOGFILE_HEADER_SIZE, logChannel.getUnpersistedBytes()); } for (int j = numEntries; j < 2 * numEntries; j++) { @@ -1408,7 +1411,7 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { for (long i = 0; i < numOfActiveLedgers; i++) { BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i); Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE", - logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE); + logChannel.getUnpersistedBytes() > DefaultEntryLogger.LOGFILE_HEADER_SIZE); } Assert.assertEquals("LeastUnflushedloggerID", 0, entryLogger.getLeastUnflushedLogId()); @@ -1444,7 +1447,7 @@ public void testReadAddCallsOfMultipleEntryLogs() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager()); int numOfActiveLedgers = 10; @@ -1513,9 +1516,9 @@ class ReadTask implements Callable { long ledgerId; int entryId; long position; - EntryLogger entryLogger; + DefaultEntryLogger entryLogger; - ReadTask(long ledgerId, int entryId, long position, EntryLogger entryLogger) { + ReadTask(long ledgerId, int entryId, long position, DefaultEntryLogger entryLogger) { this.ledgerId = ledgerId; this.entryId = entryId; this.position = position; @@ -1554,7 +1557,7 @@ public void testConcurrentReadCallsAfterEntryLogsAreRotated() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); int numOfActiveLedgers = 15; int numEntries = 2000; final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries); @@ -1633,7 +1636,7 @@ public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception { LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class, @@ -1704,9 +1707,9 @@ public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception { * in this method we add an entry and validate the ledgerdir of the * currentLogForLedger against the provided expected ledgerDirs. */ - void addEntryAndValidateFolders(EntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId, - File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1, - File expectedDirForLedger2) throws IOException { + void addEntryAndValidateFolders(DefaultEntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId, + File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1, + File expectedDirForLedger2) throws IOException { entryLogger.addEntry(0L, generateEntry(0, entryId)); entryLogger.addEntry(1L, generateEntry(1, entryId)); entryLogger.addEntry(2L, generateEntry(2, entryId)); @@ -1752,8 +1755,8 @@ public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled, LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager); - EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); + DefaultEntryLogger defaultEntryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); + EntryLogManagerBase entryLogManager = (EntryLogManagerBase) defaultEntryLogger.getEntryLogManager(); Assert.assertEquals( "EntryLogManager class type", initialEntryLogPerLedgerEnabled ? EntryLogManagerForEntryLogPerLedger.class : EntryLogManagerForSingleEntryLog.class, @@ -1771,7 +1774,7 @@ public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled, */ for (int j = 0; j < numEntries; j++) { for (int i = 0; i < numOfActiveLedgers; i++) { - positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j)); + positions[i][j] = defaultEntryLogger.addEntry((long) i, generateEntry(i, j)); long entryLogId = (positions[i][j] >> 32L); if (initialEntryLogPerLedgerEnabled) { Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId); @@ -1799,7 +1802,7 @@ public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled, conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled); LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - EntryLogger newEntryLogger = new EntryLogger(conf, newLedgerDirsManager); + DefaultEntryLogger newEntryLogger = new DefaultEntryLogger(conf, newLedgerDirsManager); EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager(); Assert.assertEquals("EntryLogManager class type", laterEntryLogPerLedgerEnabled ? EntryLogManagerForEntryLogPerLedger.class diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java index ddb0461300b..9ed271ecfca 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java @@ -64,7 +64,7 @@ public class GarbageCollectorThreadTest { @Before public void setUp() throws Exception { - when(ledgerStorage.getEntryLogger()).thenReturn(mock(EntryLogger.class)); + when(ledgerStorage.getEntryLogger()).thenReturn(mock(DefaultEntryLogger.class)); openMocks(this); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java index e9a02c28fe4..777ba5f47c4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java @@ -104,7 +104,7 @@ public void start() { } }; - static class TestableEntryLogger extends EntryLogger { + static class TestableDefaultEntryLogger extends DefaultEntryLogger { public interface CheckEntryListener { void accept(long ledgerId, long entryId, @@ -113,7 +113,7 @@ void accept(long ledgerId, } volatile CheckEntryListener testPoint; - public TestableEntryLogger( + public TestableDefaultEntryLogger( ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener, @@ -138,7 +138,7 @@ void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupEx TestStatsProvider statsProvider = new TestStatsProvider(); ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); LedgerDirsManager ledgerDirsManager; - TestableEntryLogger entryLogger; + TestableDefaultEntryLogger entryLogger; InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage(); final long numWrites = 2000; final long moreNumOfWrites = 3000; @@ -157,7 +157,7 @@ public void setUp() throws Exception { ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - entryLogger = new TestableEntryLogger( + entryLogger = new TestableDefaultEntryLogger( conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE); interleavedStorage.initializeWithEntryLogger( conf, null, ledgerDirsManager, ledgerDirsManager, @@ -444,7 +444,7 @@ void printInfoLine(String s) { // Remove a logger - EntryLogger entryLogger = new EntryLogger(conf); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf); entryLogger.removeEntryLog(someEntryLogger.get()); // Should fail consistency checker diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java index 222e268078d..7afa816af99 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java @@ -43,7 +43,6 @@ import java.util.stream.LongStream; import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo; -import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel; import org.apache.bookkeeper.bookie.Journal.LastLogMark; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -372,7 +371,8 @@ conf, new TestBookieImpl(conf), } handle.close(); // simulate rolling entrylog - ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId); + ((EntryLogManagerBase) ((DefaultEntryLogger) ledgerStorage.getEntryLogger()).getEntryLogManager()) + .createNewLog(ledgerId); // sleep for a bit for checkpoint to do its task executorController.advance(Duration.ofMillis(500)); @@ -508,7 +508,7 @@ conf, new TestBookieImpl(conf), clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); BookKeeper bkClient = new BookKeeper(clientConf); InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().getLedgerStorage(); - EntryLogger entryLogger = ledgerStorage.entryLogger; + DefaultEntryLogger entryLogger = ledgerStorage.entryLogger; EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger .getEntryLogManager(); @@ -546,7 +546,7 @@ conf, new TestBookieImpl(conf), * since checkpoint happenend, there shouldn't be any logChannelsToFlush * and bytesWrittenSinceLastFlush should be zero. */ - List copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels(); + List copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels(); Assert.assertTrue("There shouldn't be logChannelsToFlush", ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0))); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java index bdd7af354ed..8f82dd88b2a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java @@ -44,27 +44,29 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage { /** * Strictly for testing. */ - public static class SlowEntryLogger extends EntryLogger { + public static class SlowDefaultEntryLogger extends DefaultEntryLogger { public volatile long getDelay = 0; public volatile long addDelay = 0; public volatile long flushDelay = 0; - public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener, - StatsLogger statsLogger) throws IOException { + public SlowDefaultEntryLogger(ServerConfiguration conf, + LedgerDirsManager ledgerDirsManager, + EntryLogListener listener, + StatsLogger statsLogger) throws IOException { super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT); } - public SlowEntryLogger setAddDelay(long delay) { + public SlowDefaultEntryLogger setAddDelay(long delay) { addDelay = delay; return this; } - public SlowEntryLogger setGetDelay(long delay) { + public SlowDefaultEntryLogger setGetDelay(long delay) { getDelay = delay; return this; } - public SlowEntryLogger setFlushDelay(long delay) { + public SlowDefaultEntryLogger setFlushDelay(long delay) { flushDelay = delay; return this; } @@ -76,9 +78,9 @@ public void flush() throws IOException { } @Override - public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException { + public long addEntry(long ledger, ByteBuf entry) throws IOException { delayMs(addDelay); - return super.addEntry(ledger, entry, rollLog); + return super.addEntry(ledger, entry); } @Override @@ -120,22 +122,22 @@ public void initialize(ServerConfiguration conf, long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0); long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0); - entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this, statsLogger) + entryLogger = new SlowDefaultEntryLogger(conf, ledgerDirsManager, this, statsLogger) .setAddDelay(addDelay) .setGetDelay(getDelay) .setFlushDelay(flushDelay); } public void setAddDelay(long delay) { - ((SlowEntryLogger) entryLogger).setAddDelay(delay); + ((SlowDefaultEntryLogger) entryLogger).setAddDelay(delay); } public void setGetDelay(long delay) { - ((SlowEntryLogger) entryLogger).setGetDelay(delay); + ((SlowDefaultEntryLogger) entryLogger).setGetDelay(delay); } public void setFlushDelay(long delay) { - ((SlowEntryLogger) entryLogger).setFlushDelay(delay); + ((SlowDefaultEntryLogger) entryLogger).setFlushDelay(delay); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index a60ab7e34cd..07b0018683e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -224,9 +224,9 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { }); // simulate entry log is rotated (due to compaction) - EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger() - .getEntryLogManager(); - entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID); + EntryLogManagerForSingleEntryLog entryLogManager = + (EntryLogManagerForSingleEntryLog) ((DefaultEntryLogger) storage.getEntryLogger()).getEntryLogManager(); + entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId(); long currentLogId = entryLogManager.getCurrentLogId(); log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 00efee9e4be..2116ff5cf13 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -36,10 +36,10 @@ import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieProtocol; @@ -227,7 +227,7 @@ public void testBookieCompaction() throws Exception { newEntry3.writeLong(4); // ledger id newEntry3.writeLong(3); // entry id newEntry3.writeBytes("new-entry-3".getBytes()); - long location = entryLogger.addEntry(4L, newEntry3, false); + long location = entryLogger.addEntry(4L, newEntry3); List locations = Lists.newArrayList(new EntryLocation(4, 3, location)); singleDirStorage.updateEntriesLocations(locations); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index bedb76b990b..d3e628fc0ba 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -57,12 +57,12 @@ import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollector; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; import org.apache.bookkeeper.bookie.StateManager; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.api.DigestType; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 8740577dc37..362ff50b15d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -40,10 +40,10 @@ import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.StateManager; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java index 2b28bfc6941..d6b8b735671 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java @@ -59,7 +59,7 @@ public interface ZooKeeperCluster { void killCluster() throws Exception; - void sleepCluster(final int time, final TimeUnit timeUnit, final CountDownLatch l) + void sleepCluster(int time, final TimeUnit timeUnit, final CountDownLatch l) throws InterruptedException, IOException; default void expireSession(ZooKeeper zk) throws Exception { diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java index 1c579fd92be..ec12702aac9 100644 --- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java @@ -22,8 +22,8 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doNothing; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger; +import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; import org.junit.Assert; import org.junit.Test; @@ -42,8 +42,8 @@ public void setup() throws Exception { super.setup(); mockServerConfigurationConstruction(); - mockConstruction(ReadOnlyEntryLogger.class, (entryLogger, context) -> { - doNothing().when(entryLogger).scanEntryLog(anyLong(), any(EntryLogger.EntryLogScanner.class)); + mockConstruction(ReadOnlyDefaultEntryLogger.class, (entryLogger, context) -> { + doNothing().when(entryLogger).scanEntryLog(anyLong(), any(EntryLogScanner.class)); }); } diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java index 4543168aa1a..bc4d06c37c7 100644 --- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.when; import org.apache.bookkeeper.bookie.EntryLogMetadata; -import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger; +import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger; import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.junit.Assert; @@ -49,7 +49,7 @@ public void setup() throws Exception { mockServerConfigurationConstruction(); entryLogMetadata = mock(EntryLogMetadata.class); - mockConstruction(ReadOnlyEntryLogger.class, (entryLogger, context) -> { + mockConstruction(ReadOnlyDefaultEntryLogger.class, (entryLogger, context) -> { when(entryLogger.getEntryLogMetadata(anyLong())).thenReturn(entryLogMetadata); }); @@ -69,7 +69,7 @@ public void testWithoutFlags() { public void commandTest() throws Exception { ReadLogMetadataCommand cmd = new ReadLogMetadataCommand(); Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-l", "1" })); - verify(getMockedConstruction(ReadOnlyEntryLogger.class).constructed().get(0), times(1)) + verify(getMockedConstruction(ReadOnlyDefaultEntryLogger.class).constructed().get(0), times(1)) .getEntryLogMetadata(anyLong()); verify(entryLogMetadata, times(1)).getLedgersMap(); }