From 6cc3a35951e1b6e926a9d588aac1432df2d065cd Mon Sep 17 00:00:00 2001 From: chenhang Date: Sun, 26 Jun 2022 22:32:44 +0800 Subject: [PATCH 1/9] DbLedgerStorage add direct entry logger support --- .../bookie/storage/ldb/DbLedgerStorage.java | 107 +++++++++++++++++- .../ldb/SingleDirectoryDbLedgerStorage.java | 8 +- .../storage/ldb/DbLedgerStorageTest.java | 2 +- ...edgerStorageWithDirectEntryLoggerTest.java | 33 ++++++ .../ldb/DbLedgerStorageWriteCacheTest.java | 16 +-- 5 files changed, 148 insertions(+), 18 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 26b2e2c3957..81d33642228 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -39,6 +39,7 @@ import java.util.EnumSet; import java.util.List; import java.util.PrimitiveIterator.OfLong; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -49,18 +50,24 @@ import org.apache.bookkeeper.bookie.CheckpointSource; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerCache; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.StateManager; +import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; +import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor; import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.common.util.Watcher; +import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.slogger.slf4j.Slf4jSlogger; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -76,8 +83,16 @@ public class DbLedgerStorage implements LedgerStorage { public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb"; - public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger"; + public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerTotalReadBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB = + "dbStorage_directIOEntryLoggerReadBufferSizeMb"; + public static final String DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS = + "dbStorage_directIOEntryLoggerMaxFdCacheTimeSeconds"; static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs"; @@ -91,6 +106,16 @@ public class DbLedgerStorage implements LedgerStorage { static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; + private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB = + (long) (0.125 * PlatformDependent.maxDirectMemory()) + / MB; + private static final long DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB = + (long) (0.125 * PlatformDependent.maxDirectMemory()) + / MB; + private static final long DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB = 8; + + private static final int DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS = 300; + // use the storage assigned to ledger 0 for flags. // if the storage configuration changes, the flags may be lost // but in that case data integrity should kick off anyhow. @@ -100,6 +125,9 @@ public class DbLedgerStorage implements LedgerStorage { // Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized private ScheduledExecutorService gcExecutor; + private ExecutorService entryLoggerWriteExecutor = null; + private ExecutorService entryLoggerFlushExecutor = null; + private DbLedgerStorageStats stats; protected ByteBufAllocator allocator; @@ -127,6 +155,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB; long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; + boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false); this.allocator = allocator; this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); @@ -166,9 +195,54 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le iDirs[0] = indexDir.getParentFile(); LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), statsLogger); - ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, idm, - statsLogger, gcExecutor, perDirectoryWriteCacheSize, - perDirectoryReadCacheSize, readAheadCacheBatchSize)); + EntryLogger entrylogger; + if (directIOEntryLogger) { + long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; + long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; + int readBufferSize = MB * (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); + int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, + DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); + Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); + entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerWrite")); + entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerFlush")); + + int numReadThreads = conf.getNumReadWorkerThreads(); + if (numReadThreads == 0) { + numReadThreads = conf.getServerNumIOThreads(); + } + + entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ledgerDirsManager, slog), + new NativeIOImpl(), + allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, + conf.getEntryLogSizeLimit(), + conf.getNettyMaxFrameSizeBytes() - 500, + perDirectoryTotalWriteBufferSize, + perDirectoryTotalReadBufferSize, + readBufferSize, + numReadThreads, + maxFdCacheTimeSeconds, + slog, statsLogger); + } else { + entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); + } + ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, + idm, entrylogger, + statsLogger, gcExecutor, perDirectoryWriteCacheSize, + perDirectoryReadCacheSize, + readAheadCacheBatchSize)); ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); @@ -206,10 +280,11 @@ public Long getSample() { @VisibleForTesting protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, + EntryLogger entryLogger, StatsLogger statsLogger, + ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { - return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, + return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize, readAheadCacheBatchSize); } @@ -237,6 +312,13 @@ public void shutdown() throws InterruptedException { for (LedgerStorage ls : ledgerStorageList) { ls.shutdown(); } + + if (entryLoggerWriteExecutor != null) { + entryLoggerWriteExecutor.shutdown(); + } + if (entryLoggerFlushExecutor != null) { + entryLoggerFlushExecutor.shutdown(); + } } @Override @@ -448,6 +530,19 @@ static long getLongVariableOrDefault(ServerConfiguration conf, String keyName, l } } + static boolean getBooleanVariableOrDefault(ServerConfiguration conf, String keyName, boolean defaultValue) { + Object obj = conf.getProperty(keyName); + if (obj instanceof Boolean) { + return (Boolean) obj; + } else if (obj == null) { + return defaultValue; + } else if (StringUtils.isEmpty(conf.getString(keyName))) { + return defaultValue; + } else { + return conf.getBoolean(keyName); + } + } + @Override public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException { // check Issue #2078 diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 303f1314064..0f99da17188 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -147,9 +147,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger, - ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, - int readAheadCacheBatchSize) throws IOException { + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, + StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); @@ -193,7 +193,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES); - entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator); + this.entryLogger = entryLogger; gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, entryLogger, statsLogger); dbLedgerStorageStats = new DbLedgerStorageStats( diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 30bf397b720..f52ed7f218f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -58,7 +58,7 @@ public class DbLedgerStorageTest { private DbLedgerStorage storage; private File tmpDir; private LedgerDirsManager ledgerDirsManager; - private ServerConfiguration conf; + protected ServerConfiguration conf; @Before public void setup() throws Exception { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java new file mode 100644 index 00000000000..d50e9599f18 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import org.junit.Before; + +public class DbLedgerStorageWithDirectEntryLoggerTest extends DbLedgerStorageTest { + + @Override + @Before + public void setup() throws Exception { + super.setup(); + conf.setProperty("dbStorage_directIOEntryLogger", true); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java index 955115351f1..6434b6f95e9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -53,21 +54,22 @@ private static class MockedDbLedgerStorage extends DbLedgerStorage { @Override protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf, - LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - StatsLogger statsLogger, ScheduledExecutorService gcExecutor, - long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) + LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + EntryLogger entryLogger, StatsLogger statsLogger, ScheduledExecutorService gcExecutor, + long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, - statsLogger, allocator, gcExecutor, writeCacheSize, - readCacheSize, readAheadCacheBatchSize); + entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, + readCacheSize, readAheadCacheBatchSize); } private static class MockedSingleDirectoryDbLedgerStorage extends SingleDirectoryDbLedgerStorage { public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, + StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException { - super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, + super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize, readAheadCacheBatchSize); } From 1214445729e8a4fa2b227cf791b26d1b2d386ac5 Mon Sep 17 00:00:00 2001 From: chenhang Date: Sun, 26 Jun 2022 22:59:28 +0800 Subject: [PATCH 2/9] format code --- .../bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java | 1 - .../storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 0f99da17188..9ac56b9649c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -58,7 +58,6 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; -import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.GarbageCollectionStatus; import org.apache.bookkeeper.bookie.GarbageCollectorThread; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java index d50e9599f18..b433bc65409 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -22,6 +22,9 @@ import org.junit.Before; +/** + * Unit test for {@link DbLedgerStorage} with directIO entrylogger. + */ public class DbLedgerStorageWithDirectEntryLoggerTest extends DbLedgerStorageTest { @Override From ff139a5e523d69d20919c66e3ef62dc6ce375966 Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 29 Jun 2022 09:12:34 +0800 Subject: [PATCH 3/9] address comments --- .../bookkeeper/bookie/storage/ldb/DbLedgerStorage.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 81d33642228..872cd8e206f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -107,10 +107,10 @@ public class DbLedgerStorage implements LedgerStorage { private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB = - (long) (0.125 * PlatformDependent.maxDirectMemory()) + (long) (0.125 * PlatformDependent.estimateMaxDirectMemory()) / MB; private static final long DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB = - (long) (0.125 * PlatformDependent.maxDirectMemory()) + (long) (0.125 * PlatformDependent.estimateMaxDirectMemory()) / MB; private static final long DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB = 8; @@ -127,7 +127,6 @@ public class DbLedgerStorage implements LedgerStorage { private ScheduledExecutorService gcExecutor; private ExecutorService entryLoggerWriteExecutor = null; private ExecutorService entryLoggerFlushExecutor = null; - private DbLedgerStorageStats stats; protected ByteBufAllocator allocator; @@ -156,6 +155,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false); + PlatformDependent.estimateMaxDirectMemory() this.allocator = allocator; this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); From 6cc6e58883045f6d56950cfb37f1631e80c37efd Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 29 Jun 2022 09:20:19 +0800 Subject: [PATCH 4/9] format code --- .../apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 872cd8e206f..83724858bf9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -155,7 +155,6 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false); - PlatformDependent.estimateMaxDirectMemory() this.allocator = allocator; this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); From 3c13eea4c741b5e5f130efa72dd48c43f0e18874 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 30 Jun 2022 16:50:04 +0800 Subject: [PATCH 5/9] add unit test assert --- .../storage/directentrylogger/Buffer.java | 2 -- .../storage/ldb/DbLedgerStorageTest.java | 6 ++-- ...edgerStorageWithDirectEntryLoggerTest.java | 35 ++++++++++++++++++- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java index 0555ce007e2..4017ff5842d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java @@ -120,10 +120,8 @@ void writeInt(int value) throws IOException { * bytebuf by the number of bytes read (i.e. to the end). */ void writeByteBuf(ByteBuf bytebuf) throws IOException { - int bytesWritten = bytebuf.readableBytes(); ByteBuffer bytesToPut = bytebuf.nioBuffer(); byteBuffer.put(bytesToPut); - bytebuf.skipBytes(bytesWritten); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index f52ed7f218f..5972c88c874 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -55,9 +55,9 @@ */ public class DbLedgerStorageTest { private static final Logger log = LoggerFactory.getLogger(DbLedgerStorageTest.class); - private DbLedgerStorage storage; - private File tmpDir; - private LedgerDirsManager ledgerDirsManager; + protected DbLedgerStorage storage; + protected File tmpDir; + protected LedgerDirsManager ledgerDirsManager; protected ServerConfiguration conf; @Before diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java index b433bc65409..3b8aa879e01 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -20,7 +20,22 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import java.io.File; +import java.util.List; +import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogger; +import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; +import org.apache.bookkeeper.conf.TestBKConfiguration; import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Unit test for {@link DbLedgerStorage} with directIO entrylogger. @@ -30,7 +45,25 @@ public class DbLedgerStorageWithDirectEntryLoggerTest extends DbLedgerStorageTes @Override @Before public void setup() throws Exception { - super.setup(); + tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = BookieImpl.getCurrentDirectory(tmpDir); + BookieImpl.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); conf.setProperty("dbStorage_directIOEntryLogger", true); + BookieImpl bookie = new TestBookieImpl(conf); + + ledgerDirsManager = bookie.getLedgerDirsManager(); + storage = (DbLedgerStorage) bookie.getLedgerStorage(); + + storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { + assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DirectEntryLogger); + }); } } From 4a605f7bf0136af4e53dcb209105530a558493e0 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 30 Jun 2022 16:51:29 +0800 Subject: [PATCH 6/9] format code --- .../ldb/DbLedgerStorageWithDirectEntryLoggerTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java index 3b8aa879e01..f12633b8679 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -20,21 +20,12 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; -import com.google.common.collect.Lists; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; import java.io.File; -import java.util.List; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.TestBookieImpl; -import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** From ce55bd3f97784676a5c936827b3415ffe6f84ee5 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 30 Jun 2022 17:05:16 +0800 Subject: [PATCH 7/9] format code --- .../storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java index f12633b8679..e1dfe71d2ba 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWithDirectEntryLoggerTest.java @@ -20,13 +20,13 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import static org.junit.Assert.assertTrue; import java.io.File; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.junit.Before; -import static org.junit.Assert.assertTrue; /** * Unit test for {@link DbLedgerStorage} with directIO entrylogger. From 929a410881a2b68eb85c43aa96a787aed7283b81 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 30 Jun 2022 18:17:36 +0800 Subject: [PATCH 8/9] fix failed test --- .../bookie/storage/directentrylogger/Buffer.java | 2 ++ .../bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java index 4017ff5842d..0555ce007e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java @@ -120,8 +120,10 @@ void writeInt(int value) throws IOException { * bytebuf by the number of bytes read (i.e. to the end). */ void writeByteBuf(ByteBuf bytebuf) throws IOException { + int bytesWritten = bytebuf.readableBytes(); ByteBuffer bytesToPut = bytebuf.nioBuffer(); byteBuffer.put(bytesToPut); + bytebuf.skipBytes(bytesWritten); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 5972c88c874..1bda8ebd14b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -35,11 +35,13 @@ import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.bookie.storage.EntryLogger; +import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieProtocol; @@ -77,6 +79,10 @@ public void setup() throws Exception { ledgerDirsManager = bookie.getLedgerDirsManager(); storage = (DbLedgerStorage) bookie.getLedgerStorage(); + + storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> { + assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger); + }); } @After @@ -228,6 +234,7 @@ public void testBookieCompaction() throws Exception { newEntry3.writeLong(3); // entry id newEntry3.writeBytes("new-entry-3".getBytes()); long location = entryLogger.addEntry(4L, newEntry3); + newEntry3.resetReaderIndex(); List locations = Lists.newArrayList(new EntryLocation(4, 3, location)); singleDirStorage.updateEntriesLocations(locations); From 5da73f4062b6658f84150ca63a5f93c3579cacd5 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 30 Jun 2022 18:26:58 +0800 Subject: [PATCH 9/9] format code --- .../bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 1bda8ebd14b..09c12c35c38 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -41,7 +41,6 @@ import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.bookie.storage.EntryLogger; -import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieProtocol;