Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -49,18 +50,24 @@
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectionStatus;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerCache;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.slogger.slf4j.Slf4jSlogger;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -76,8 +83,16 @@
public class DbLedgerStorage implements LedgerStorage {

public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";

public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger";
public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB =
"dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb";
public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB =
"dbStorage_directIOEntryLoggerTotalReadBufferSizeMb";
public static final String DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB =
"dbStorage_directIOEntryLoggerReadBufferSizeMb";
public static final String DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS =
"dbStorage_directIOEntryLoggerMaxFdCacheTimeSeconds";

static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";

Expand All @@ -91,6 +106,16 @@ public class DbLedgerStorage implements LedgerStorage {
static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;

private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB =
(long) (0.125 * PlatformDependent.estimateMaxDirectMemory())
/ MB;
private static final long DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB =
(long) (0.125 * PlatformDependent.estimateMaxDirectMemory())
/ MB;
private static final long DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB = 8;

private static final int DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS = 300;

// use the storage assigned to ledger 0 for flags.
// if the storage configuration changes, the flags may be lost
// but in that case data integrity should kick off anyhow.
Expand All @@ -100,6 +125,8 @@ public class DbLedgerStorage implements LedgerStorage {

// Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized
private ScheduledExecutorService gcExecutor;
private ExecutorService entryLoggerWriteExecutor = null;
private ExecutorService entryLoggerFlushExecutor = null;

protected ByteBufAllocator allocator;

Expand Down Expand Up @@ -127,6 +154,7 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB,
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false);

this.allocator = allocator;
this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
Expand Down Expand Up @@ -166,9 +194,54 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
iDirs[0] = indexDir.getParentFile();
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), statsLogger);

ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, idm,
statsLogger, gcExecutor, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize, readAheadCacheBatchSize));
EntryLogger entrylogger;
if (directIOEntryLogger) {
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
int readBufferSize = MB * (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerWrite"));
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerFlush"));

int numReadThreads = conf.getNumReadWorkerThreads();
if (numReadThreads == 0) {
numReadThreads = conf.getServerNumIOThreads();
}

entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ledgerDirsManager, slog),
new NativeIOImpl(),
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
conf.getEntryLogSizeLimit(),
conf.getNettyMaxFrameSizeBytes() - 500,
perDirectoryTotalWriteBufferSize,
perDirectoryTotalReadBufferSize,
readBufferSize,
numReadThreads,
maxFdCacheTimeSeconds,
slog, statsLogger);
} else {
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
}
ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
idm, entrylogger,
statsLogger, gcExecutor, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
Expand Down Expand Up @@ -206,10 +279,11 @@ public Long getSample() {
@VisibleForTesting
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize,
EntryLogger entryLogger, StatsLogger statsLogger,
ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize,
int readAheadCacheBatchSize)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger,
statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize,
readAheadCacheBatchSize);
}
Expand Down Expand Up @@ -237,6 +311,13 @@ public void shutdown() throws InterruptedException {
for (LedgerStorage ls : ledgerStorageList) {
ls.shutdown();
}

if (entryLoggerWriteExecutor != null) {
entryLoggerWriteExecutor.shutdown();
}
if (entryLoggerFlushExecutor != null) {
entryLoggerFlushExecutor.shutdown();
}
}

@Override
Expand Down Expand Up @@ -448,6 +529,19 @@ static long getLongVariableOrDefault(ServerConfiguration conf, String keyName, l
}
}

static boolean getBooleanVariableOrDefault(ServerConfiguration conf, String keyName, boolean defaultValue) {
Object obj = conf.getProperty(keyName);
if (obj instanceof Boolean) {
return (Boolean) obj;
} else if (obj == null) {
return defaultValue;
} else if (StringUtils.isEmpty(conf.getString(keyName))) {
return defaultValue;
} else {
return conf.getBoolean(keyName);
}
}

@Override
public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
// check Issue #2078
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.GarbageCollectionStatus;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
Expand Down Expand Up @@ -147,9 +146,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
private final Counter flushExecutorTime;

public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger,
ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize,
int readAheadCacheBatchSize) throws IOException {
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger,
StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor,
long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize) throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");

Expand Down Expand Up @@ -193,7 +192,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);

entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
this.entryLogger = entryLogger;
gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, entryLogger, statsLogger);

dbLedgerStorageStats = new DbLedgerStorageStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
Expand All @@ -55,10 +56,10 @@
*/
public class DbLedgerStorageTest {
private static final Logger log = LoggerFactory.getLogger(DbLedgerStorageTest.class);
private DbLedgerStorage storage;
private File tmpDir;
private LedgerDirsManager ledgerDirsManager;
private ServerConfiguration conf;
protected DbLedgerStorage storage;
protected File tmpDir;
protected LedgerDirsManager ledgerDirsManager;
protected ServerConfiguration conf;

@Before
public void setup() throws Exception {
Expand All @@ -77,6 +78,10 @@ public void setup() throws Exception {

ledgerDirsManager = bookie.getLedgerDirsManager();
storage = (DbLedgerStorage) bookie.getLedgerStorage();

storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> {
assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DefaultEntryLogger);
});
}

@After
Expand Down Expand Up @@ -228,6 +233,7 @@ public void testBookieCompaction() throws Exception {
newEntry3.writeLong(3); // entry id
newEntry3.writeBytes("new-entry-3".getBytes());
long location = entryLogger.addEntry(4L, newEntry3);
newEntry3.resetReaderIndex();

List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
singleDirStorage.updateEntriesLocations(locations);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie.storage.ldb;

import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.Before;

/**
* Unit test for {@link DbLedgerStorage} with directIO entrylogger.
*/
public class DbLedgerStorageWithDirectEntryLoggerTest extends DbLedgerStorageTest {

@Override
@Before
public void setup() throws Exception {
tmpDir = File.createTempFile("bkTest", ".dir");
tmpDir.delete();
tmpDir.mkdir();
File curDir = BookieImpl.getCurrentDirectory(tmpDir);
BookieImpl.checkDirectoryStructure(curDir);

int gcWaitTime = 1000;
conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { tmpDir.toString() });
conf.setProperty("dbStorage_directIOEntryLogger", true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add some assertions in the tests about the fact that we really loaded this implementation?
The tests may pass because we aren't using the new implementation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

BookieImpl bookie = new TestBookieImpl(conf);

ledgerDirsManager = bookie.getLedgerDirsManager();
storage = (DbLedgerStorage) bookie.getLedgerStorage();

storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage -> {
assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() instanceof DirectEntryLogger);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
Expand All @@ -53,21 +54,22 @@ private static class MockedDbLedgerStorage extends DbLedgerStorage {

@Override
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
StatsLogger statsLogger, ScheduledExecutorService gcExecutor,
long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize)
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger, ScheduledExecutorService gcExecutor,
long writeCacheSize, long readCacheSize, int readAheadCacheBatchSize)
throws IOException {
return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
statsLogger, allocator, gcExecutor, writeCacheSize,
readCacheSize, readAheadCacheBatchSize);
entryLogger, statsLogger, allocator, gcExecutor, writeCacheSize,
readCacheSize, readAheadCacheBatchSize);
}

private static class MockedSingleDirectoryDbLedgerStorage extends SingleDirectoryDbLedgerStorage {
public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger,
StatsLogger statsLogger,
ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize,
long readCacheSize, int readAheadCacheBatchSize) throws IOException {
super(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, entryLogger,
statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize, readAheadCacheBatchSize);
}

Expand Down