diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 26b2e2c3957..83724858bf9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -39,6 +39,7 @@ import java.util.EnumSet; import java.util.List; import java.util.PrimitiveIterator.OfLong; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -49,18 +50,24 @@ import org.apache.bookkeeper.bookie.CheckpointSource; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerCache; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.StateManager; +import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; +import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor; import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.common.util.Watcher; +import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.slogger.slf4j.Slf4jSlogger; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -76,8 +83,16 @@ public class DbLedgerStorage implements LedgerStorage { public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb"; - public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger"; + public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerTotalReadBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerReadBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS = + "dbStorage_directIOEntryLoggerMaxFdCacheTimeSeconds"; static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs"; @@ -91,6 +106,16 @@ public class DbLedgerStorage implements LedgerStorage { static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; + private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB = + (long) (0.125 * PlatformDependent.estimateMaxDirectMemory()) + / MB; + private static final long DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB = + (long) (0.125 * PlatformDependent.estimateMaxDirectMemory()) + / MB; + private static final long DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB = 8; + + private static final int DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS = 300; + // use the storage assigned to ledger 0 for flags. // if the storage configuration changes, the flags may be lost // but in that case data integrity should kick off anyhow. @@ -100,6 +125,8 @@ public class DbLedgerStorage implements LedgerStorage { // Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized private ScheduledExecutorService gcExecutor; + private ExecutorService entryLoggerWriteExecutor = null; + private ExecutorService entryLoggerFlushExecutor = null; protected ByteBufAllocator allocator; @@ -127,6 +154,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB; long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; + boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false); this.allocator = allocator; this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); @@ -166,9 +194,54 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le iDirs[0] = indexDir.getParentFile(); LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), statsLogger); - ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, idm, - statsLogger, gcExecutor, perDirectoryWriteCacheSize, - perDirectoryReadCacheSize, readAheadCacheBatchSize)); + EntryLogger entrylogger; + if (directIOEntryLogger) { + long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; + long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; + int readBufferSize = MB * (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); + int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, + DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); + Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); + entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerWrite")); + entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerFlush")); + + int numReadThreads = conf.getNumReadWorkerThreads(); + if (numReadThreads == 0) { + numReadThreads = conf.getServerNumIOThreads(); + } + + entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ledgerDirsManager, slog), + new NativeIOImpl(), + allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, + conf.getEntryLogSizeLimit(), + conf.getNettyMaxFrameSizeBytes() - 500, + perDirectoryTotalWriteBufferSize, + perDirectoryTotalReadBufferSize, + readBufferSize, + numReadThreads, + maxFdCacheTimeSeconds, + slog, statsLogger); + } else { + entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); + } + ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, + idm, entrylogger, + statsLogger, gcExecutor, perDirectoryWriteCacheSize, + perDirectoryReadCacheSize, + readAheadCacheBatchSize)); ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); @@ -206,10 +279,11 @@ public Long getSample() { @VisibleForTesting protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, + EntryLogger entryLogger, StatsLogger statsLogger, + ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { - return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, + return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize, readAheadCacheBatchSize); } @@ -237,6 +311,13 @@ public void shutdown() throws InterruptedException { for (LedgerStorage ls : ledgerStorageList) { ls.shutdown(); } + + if (entryLoggerWriteExecutor != null) { + entryLoggerWriteExecutor.shutdown(); + } + if (entryLoggerFlushExecutor != null) { + entryLoggerFlushExecutor.shutdown(); + } } @Override @@ -448,6 +529,19 @@ static long getLongVariableOrDefault(ServerConfiguration conf, String keyName, l } } + static boolean getBooleanVariableOrDefault(ServerConfiguration conf, String keyName, boolean defaultValue) { + Object obj = conf.getProperty(keyName); + if (obj instanceof Boolean) { + return (Boolean) obj; + } else if (obj == null) { + return defaultValue; + } else if (StringUtils.isEmpty(conf.getString(keyName))) { + return defaultValue; + } else { + return conf.getBoolean(keyName); + } + } + @Override public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException { // check Issue #2078 diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 303f1314064..9ac56b9649c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -58,7 +58,6 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; -import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.GarbageCollectorThread; @@ -147,9 +146,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger, - ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, - int readAheadCacheBatchSize) throws IOException { + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, + StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); @@ -193,7 +192,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES); - entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator); + this.entryLogger = entryLogger; gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, entryLogger, statsLogger); dbLedgerStorageStats = new DbLedgerStorageStats( diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 30bf397b720..09c12c35c38 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; @@ -55,10 +56,10 @@ */ public class DbLedgerStorageTest { private static final Logger log = LoggerFactory.getLogger(DbLedgerStorageTest.class); - private DbLedgerStorage storage; - private File tmpDir; - private LedgerDirsManager ledgerDirsManager; - private ServerConfiguration conf; + protected DbLedgerStorage storage; + protected File tmpDir; + protected LedgerDirsManager ledgerDirsManager; + protected ServerConfiguration conf; @Before public void setup() throws Exception { @@ -77,6 +78,10 @@ public void setup() throws Exception { ledgerDirsManager = bookie.getLedgerDirsManager(); storage = (DbLedgerStorage) bookie.getLedgerStorage(); + + storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { + assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger); + }); } @After @@ -228,6 +233,7 @@ public void testBookieCompaction() throws Exception { newEntry3.writeLong(3); // entry id newEntry3.writeBytes("new-entry-3".getBytes()); long location = entryLogger.addEntry(4L, newEntry3); + newEntry3.resetReaderIndex(); List locations = Lists.newArrayList(new EntryLocation(4, 3, location)); singleDirStorage.updateEntriesLocations(locations); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java new file mode 100644 index 00000000000..e1dfe71d2ba --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertTrue; +import java.io.File; +import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; +import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.junit.Before; + +/** + * Unit test for {@link DbLedgerStorage} with directIO entrylogger. + */ +public class DbLedgerStorageWithDirectEntryLoggerTest extends DbLedgerStorageTest { + + @Override + @Before + public void setup() throws Exception { + tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = BookieImpl.getCurrentDirectory(tmpDir); + BookieImpl.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + conf.setProperty("dbStorage_directIOEntryLogger", true); + BookieImpl bookie = new TestBookieImpl(conf); + + ledgerDirsManager = bookie.getLedgerDirsManager(); + storage = (DbLedgerStorage) bookie.getLedgerStorage(); + + storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { + assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DirectEntryLogger); + }); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java index 955115351f1..6434b6f95e9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -53,21 +54,22 @@ private static class MockedDbLedgerStorage extends DbLedgerStorage { @Override protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, - LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - StatsLogger statsLogger, ScheduledExecutorService gcExecutor, - long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) + LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + EntryLogger entryLogger, StatsLogger statsLogger, ScheduledExecutorService gcExecutor, + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, - statsLogger, allocator, gcExecutor, writeCacheSize, - readCacheSize, readAheadCacheBatchSize); + entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, + readCacheSize, readAheadCacheBatchSize); } private static class MockedSingleDirectoryDbLedgerStorage extends SingleDirectoryDbLedgerStorage { public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, + StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { - super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, + super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize, readAheadCacheBatchSize); }