diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e5b42cc2618..cf705e341c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -127,4 +127,4 @@ public long getEntry() {
}
}
-}
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 02bddd7470c..dce6d0b45c4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -91,6 +91,7 @@ public interface Code {
int UnknownBookieIdException = -107;
int OperationRejectedException = -108;
int CookieExistsException = -109;
+ int EntryLogMetadataMapException = -110;
}
public int getCode() {
@@ -124,6 +125,9 @@ public String getMessage(int code) {
case Code.CookieExistsException:
err = "Cookie already exists";
break;
+ case Code.EntryLogMetadataMapException:
+ err = "Error in accessing Entry-log metadata map";
+ break;
case Code.MetadataStoreException:
err = "Error performing metadata operations";
break;
@@ -254,6 +258,15 @@ public CookieExistException(Throwable cause) {
}
}
+ /**
+ * Signal that error while accessing entry-log metadata map.
+ */
+ public static class EntryLogMetadataMapException extends BookieException {
+ public EntryLogMetadataMapException(Throwable cause) {
+ super(Code.EntryLogMetadataMapException, cause);
+ }
+ }
+
/**
* Signals that an exception occurs on upgrading a bookie.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index cdb8ab3f115..ed57d7b7e16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -1184,6 +1184,13 @@ public static boolean format(ServerConfiguration conf,
}
}
+ // Clean up metadata directories if they are separate from the
+ // ledger dirs
+ File metadataDir = new File(conf.getGcEntryLogMetadataCachePath());
+ if (!cleanDir(metadataDir)) {
+ LOG.error("Formatting ledger metadata directory {} failed", metadataDir);
+ return false;
+ }
LOG.info("Bookie format completed successfully");
return true;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
index 17cf58fb52c..6dab68c0638 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -21,24 +21,37 @@
package org.apache.bookkeeper.bookie;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import java.util.function.LongPredicate;
+import org.apache.bookkeeper.bookie.EntryLogMetadata.EntryLogMetadataRecyclable;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
-
/**
- * Records the total size, remaining size and the set of ledgers that comprise a entry log.
+ * Records the total size, remaining size and the set of ledgers that comprise a
+ * entry log.
*/
public class EntryLogMetadata {
- private final long entryLogId;
- private long totalSize;
- private long remainingSize;
- private final ConcurrentLongLongHashMap ledgersMap;
+ protected long entryLogId;
+ protected long totalSize;
+ protected long remainingSize;
+ protected final ConcurrentLongLongHashMap ledgersMap;
+ private static final short DEFAULT_SERIALIZATION_VERSION = 0;
+
+ protected EntryLogMetadata() {
+ ledgersMap = new ConcurrentLongLongHashMap(256, 1);
+ }
public EntryLogMetadata(long logId) {
+ this();
this.entryLogId = logId;
totalSize = remainingSize = 0;
- ledgersMap = new ConcurrentLongLongHashMap(256, 1);
}
public void addLedgerSize(long ledgerId, long size) {
@@ -96,4 +109,111 @@ public String toString() {
return sb.toString();
}
+ /**
+ * Serializes {@link EntryLogMetadata} and writes to
+ * {@link DataOutputStream}.
+ *
+ * schema:
+ * 2-bytes: schema-version
+ * 8-bytes: entrylog-entryLogId
+ * 8-bytes: entrylog-totalSize
+ * 8-bytes: entrylog-remainingSize
+ * 8-bytes: total number of ledgers
+ * ledgers-map
+ * [repeat]: (8-bytes::ledgerId, 8-bytes::size-of-ledger)
+ *
+ * @param out
+ * @throws IOException
+ * throws if it couldn't serialize metadata-fields
+ * @throws IllegalStateException
+ * throws if it couldn't serialize ledger-map
+ */
+ public void serialize(DataOutputStream out) throws IOException, IllegalStateException {
+ out.writeShort(DEFAULT_SERIALIZATION_VERSION);
+ out.writeLong(entryLogId);
+ out.writeLong(totalSize);
+ out.writeLong(remainingSize);
+ out.writeLong(ledgersMap.size());
+ ledgersMap.forEach((ledgerId, size) -> {
+ try {
+ out.writeLong(ledgerId);
+ out.writeLong(size);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to serialize entryLogMetadata", e);
+ }
+ });
+ out.flush();
+ }
+
+ /**
+ * Deserializes {@link EntryLogMetadataRecyclable} from given {@link DataInputStream}.
+ * Caller has to recycle returned {@link EntryLogMetadataRecyclable}.
+ * @param in
+ * @return
+ * @throws IOException
+ */
+ public static EntryLogMetadataRecyclable deserialize(DataInputStream in) throws IOException {
+ EntryLogMetadataRecyclable metadata = EntryLogMetadataRecyclable.get();
+ try {
+ short serVersion = in.readShort();
+ if ((serVersion != DEFAULT_SERIALIZATION_VERSION)) {
+ throw new IOException(String.format("%s. expected =%d, found=%d", "serialization version doesn't match",
+ DEFAULT_SERIALIZATION_VERSION, serVersion));
+ }
+ metadata.entryLogId = in.readLong();
+ metadata.totalSize = in.readLong();
+ metadata.remainingSize = in.readLong();
+ long ledgersMapSize = in.readLong();
+ for (int i = 0; i < ledgersMapSize; i++) {
+ long ledgerId = in.readLong();
+ long entryId = in.readLong();
+ metadata.ledgersMap.put(ledgerId, entryId);
+ }
+ return metadata;
+ } catch (IOException e) {
+ metadata.recycle();
+ throw e;
+ } catch (Exception e) {
+ metadata.recycle();
+ throw new IOException(e);
+ }
+ }
+
+ public void clear() {
+ entryLogId = -1L;
+ totalSize = -1L;
+ remainingSize = -1L;
+ ledgersMap.clear();
+ }
+
+ /**
+ * Recyclable {@link EntryLogMetadata} class.
+ *
+ */
+ public static class EntryLogMetadataRecyclable extends EntryLogMetadata {
+
+ private final Handle recyclerHandle;
+
+ private EntryLogMetadataRecyclable(Handle recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler RECYCLER =
+ new Recycler() {
+ protected EntryLogMetadataRecyclable newObject(Recycler.Handle handle) {
+ return new EntryLogMetadataRecyclable(handle);
+ }
+ };
+
+ public static EntryLogMetadataRecyclable get() {
+ EntryLogMetadataRecyclable metadata = RECYCLER.get();
+ return metadata;
+ }
+
+ public void recycle() {
+ clear();
+ recyclerHandle.recycle(this);
+ }
+
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
new file mode 100644
index 00000000000..88f6ce53986
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.BiConsumer;
+
+import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
+
+/**
+ * Map-store to store Entrylogger metadata.
+ */
+public interface EntryLogMetadataMap extends Closeable {
+
+ /**
+ * Checks if record with entryLogId exists into the map.
+ *
+ * @param entryLogId
+ * @return
+ * @throws IOException
+ */
+ boolean containsKey(long entryLogId) throws EntryLogMetadataMapException;
+
+ /**
+ * Adds entryLogMetadata record into the map.
+ *
+ * @param entryLogId
+ * @param entryLogMeta
+ * @throws IOException
+ */
+ void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException;
+
+ /**
+ * Performs the given action for each entry in this map until all entries
+ * have been processed or the action throws an exception.
+ *
+ * @param action
+ * @throws IOException
+ */
+ void forEach(BiConsumer action) throws EntryLogMetadataMapException;
+
+ /**
+ * Removes entryLogMetadata record from the map.
+ *
+ * @param entryLogId
+ * @throws IOException
+ */
+ void remove(long entryLogId) throws EntryLogMetadataMapException;
+
+ /**
+ * Returns number of entryLogMetadata records presents into the map.
+ *
+ * @return
+ * @throws IOException
+ */
+ int size() throws EntryLogMetadataMapException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 32a45fd531d..bf00566f1b3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -25,11 +25,6 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -40,13 +35,17 @@
import java.util.function.Supplier;
import lombok.Getter;
+
+import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.bookie.stats.GarbageCollectorStats;
+import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +58,7 @@ public class GarbageCollectorThread extends SafeRunnable {
private static final int SECOND = 1000;
// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
- private Map entryLogMetaMap = new ConcurrentHashMap();
+ private EntryLogMetadataMap entryLogMetaMap;
private final ScheduledExecutorService gcExecutor;
Future> scheduledFuture = null;
@@ -152,6 +151,7 @@ public GarbageCollectorThread(ServerConfiguration conf,
this.conf = conf;
this.entryLogger = ledgerStorage.getEntryLogger();
+ this.entryLogMetaMap = createEntryLogMetadataMap();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
@@ -191,7 +191,14 @@ public GarbageCollectorThread(ServerConfiguration conf,
AbstractLogCompactor.LogRemovalListener remover = new AbstractLogCompactor.LogRemovalListener() {
@Override
public void removeEntryLog(long logToRemove) {
- GarbageCollectorThread.this.removeEntryLog(logToRemove);
+ try {
+ GarbageCollectorThread.this.removeEntryLog(logToRemove);
+ } catch (EntryLogMetadataMapException e) {
+ // Ignore and continue because ledger will not be cleaned up
+ // from entry-logger in this pass and will be taken care in
+ // next schedule task
+ LOG.warn("Failed to remove entry-log metadata {}", logToRemove, e);
+ }
}
};
if (conf.getUseTransactionalCompaction()) {
@@ -251,6 +258,20 @@ public void removeEntryLog(long logToRemove) {
lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
}
+ private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException {
+ if (conf.isGcEntryLogMetadataCacheEnabled()) {
+ String baseDir = this.conf.getGcEntryLogMetadataCachePath();
+ try {
+ return new PersistentEntryLogMetadataMap(baseDir, conf);
+ } catch (IOException e) {
+ LOG.error("Failed to initialize persistent-metadata-map , clean up {}", baseDir, e);
+ throw e;
+ }
+ } else {
+ return new InMemoryEntryLogMetadataMap();
+ }
+ }
+
public void enableForceGC() {
if (forceGarbageCollection.compareAndSet(false, true)) {
LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
@@ -344,54 +365,58 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
// Recover and clean up previous state if using transactional compaction
compactor.cleanUpAndRecover();
- // Extract all of the ledger ID's that comprise all of the entry logs
- // (except for the current new one which is still being written to).
- entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
+ try {
+ // Extract all of the ledger ID's that comprise all of the entry logs
+ // (except for the current new one which is still being written to).
+ extractMetaFromEntryLogs();
- // gc inactive/deleted ledgers
- doGcLedgers();
+ // gc inactive/deleted ledgers
+ doGcLedgers();
- // gc entry logs
- doGcEntryLogs();
+ // gc entry logs
+ doGcEntryLogs();
- if (suspendMajor) {
- LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
- }
- if (suspendMinor) {
- LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
- }
+ if (suspendMajor) {
+ LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
+ }
+ if (suspendMinor) {
+ LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
+ }
- long curTime = System.currentTimeMillis();
- if (((isForceMajorCompactionAllow && force)
- || (enableMajorCompaction && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)))
- && (!suspendMajor)) {
- // enter major compaction
- LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
- majorCompacting.set(true);
- doCompactEntryLogs(majorCompactionThreshold, majorCompactionMaxTimeMillis);
- lastMajorCompactionTime = System.currentTimeMillis();
- // and also move minor compaction time
- lastMinorCompactionTime = lastMajorCompactionTime;
- gcStats.getMajorCompactionCounter().inc();
- majorCompacting.set(false);
- } else if (((isForceMinorCompactionAllow && force)
- || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
- && (!suspendMinor)) {
- // enter minor compaction
- LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
- minorCompacting.set(true);
- doCompactEntryLogs(minorCompactionThreshold, minorCompactionMaxTimeMillis);
- lastMinorCompactionTime = System.currentTimeMillis();
- gcStats.getMinorCompactionCounter().inc();
- minorCompacting.set(false);
- }
+ long curTime = System.currentTimeMillis();
+ if (((isForceMajorCompactionAllow && force) || (enableMajorCompaction
+ && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)))
+ && (!suspendMajor)) {
+ // enter major compaction
+ LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
+ majorCompacting.set(true);
+ doCompactEntryLogs(majorCompactionThreshold, majorCompactionMaxTimeMillis);
+ lastMajorCompactionTime = System.currentTimeMillis();
+ // and also move minor compaction time
+ lastMinorCompactionTime = lastMajorCompactionTime;
+ gcStats.getMajorCompactionCounter().inc();
+ majorCompacting.set(false);
+ } else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction
+ && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
+ && (!suspendMinor)) {
+ // enter minor compaction
+ LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
+ minorCompacting.set(true);
+ doCompactEntryLogs(minorCompactionThreshold, minorCompactionMaxTimeMillis);
+ lastMinorCompactionTime = System.currentTimeMillis();
+ gcStats.getMinorCompactionCounter().inc();
+ minorCompacting.set(false);
+ }
- if (force) {
- if (forceGarbageCollection.compareAndSet(true, false)) {
- LOG.info("{} Set forceGarbageCollection to false after force GC to make it forceGC-able again.", Thread
- .currentThread().getName());
+ if (force && forceGarbageCollection.compareAndSet(true, false)) {
+ LOG.info("{} Set forceGarbageCollection to false after force GC to make it forceGC-able again.",
+ Thread.currentThread().getName());
}
+ } catch (EntryLogMetadataMapException e) {
+ LOG.error("Error in entryLog-metadatamap, Failed to complete GC/Compaction due to entry-log {}",
+ e.getMessage(), e);
}
+
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
}
@@ -406,29 +431,38 @@ private void doGcLedgers() {
/**
* Garbage collect those entry loggers which are not associated with any active ledgers.
*/
- private void doGcEntryLogs() {
+ private void doGcEntryLogs() throws EntryLogMetadataMapException {
// Get a cumulative count, don't update until complete
AtomicLong totalEntryLogSizeAcc = new AtomicLong(0L);
// Loop through all of the entry logs and remove the non-active ledgers.
entryLogMetaMap.forEach((entryLogId, meta) -> {
- removeIfLedgerNotExists(meta);
- if (meta.isEmpty()) {
- // This means the entry log is not associated with any active ledgers anymore.
- // We can remove this entry log file now.
- LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
- removeEntryLog(entryLogId);
- gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());
- }
-
+ try {
+ removeIfLedgerNotExists(meta);
+ // update entryMetadta to persistent-map
+ entryLogMetaMap.put(meta.getEntryLogId(), meta);
+ if (meta.isEmpty()) {
+ // This means the entry log is not associated with any active
+ // ledgers anymore.
+ // We can remove this entry log file now.
+ LOG.info("Deleting entryLogId {} as it has no active ledgers!", entryLogId);
+ removeEntryLog(entryLogId);
+ gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());
+ }
+ } catch (EntryLogMetadataMapException e) {
+ // Ignore and continue because ledger will not be cleaned up
+ // from entry-logger in this pass and will be taken care in next
+ // schedule task
+ LOG.warn("Failed to remove ledger from entry-log metadata {}", entryLogId, e);
+ }
totalEntryLogSizeAcc.getAndAdd(meta.getRemainingSize());
});
this.totalEntryLogSize = totalEntryLogSizeAcc.get();
- this.numActiveEntryLogs = entryLogMetaMap.keySet().size();
+ this.numActiveEntryLogs = entryLogMetaMap.size();
}
- private void removeIfLedgerNotExists(EntryLogMetadata meta) {
+ private void removeIfLedgerNotExists(EntryLogMetadata meta) throws EntryLogMetadataMapException {
meta.removeLedgerIf((entryLogLedger) -> {
// Remove the entry log ledger from the set if it isn't active.
try {
@@ -450,34 +484,30 @@ private void removeIfLedgerNotExists(EntryLogMetadata meta) {
*
*/
@VisibleForTesting
- void doCompactEntryLogs(double threshold, long maxTimeMillis) {
+ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMetadataMapException {
LOG.info("Do compaction to compact those files lower than {}", threshold);
- // sort the ledger meta by usage in ascending order.
- List logsToCompact = new ArrayList();
- logsToCompact.addAll(entryLogMetaMap.values());
- logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage));
-
final int numBuckets = 10;
int[] entryLogUsageBuckets = new int[numBuckets];
int[] compactedBuckets = new int[numBuckets];
long start = System.currentTimeMillis();
- long end = start;
- long timeDiff = 0;
+ MutableLong end = new MutableLong(start);
+ MutableLong timeDiff = new MutableLong(0);
- for (EntryLogMetadata meta : logsToCompact) {
+ entryLogMetaMap.forEach((entryLogId, meta) -> {
int bucketIndex = calculateUsageIndex(numBuckets, meta.getUsage());
entryLogUsageBuckets[bucketIndex]++;
- if (timeDiff < maxTimeMillis) {
- end = System.currentTimeMillis();
- timeDiff = end - start;
+ if (timeDiff.getValue() < maxTimeMillis) {
+ end.setValue(System.currentTimeMillis());
+ timeDiff.setValue(end.getValue() - start);
}
- if (meta.getUsage() >= threshold || (maxTimeMillis > 0 && timeDiff >= maxTimeMillis) || !running) {
+ if (meta.getUsage() >= threshold || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
+ || !running) {
// We allow the usage limit calculation to continue so that we get a accurate
// report of where the usage was prior to running compaction.
- continue;
+ return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting entry log {} with usage {} below threshold {}",
@@ -488,12 +518,12 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) {
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
compactedBuckets[bucketIndex]++;
- }
+ });
if (LOG.isDebugEnabled()) {
if (!running) {
LOG.debug("Compaction exited due to gc not running");
}
- if (timeDiff > maxTimeMillis) {
+ if (timeDiff.getValue() > maxTimeMillis) {
LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
}
}
@@ -534,6 +564,11 @@ public synchronized void shutdown() throws InterruptedException {
this.running = false;
// Interrupt GC executor thread
gcExecutor.shutdownNow();
+ try {
+ entryLogMetaMap.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close entryLog metadata-map", e);
+ }
}
/**
@@ -541,8 +576,9 @@ public synchronized void shutdown() throws InterruptedException {
*
* @param entryLogId
* Entry Log File Id
+ * @throws EntryLogMetadataMapException
*/
- protected void removeEntryLog(long entryLogId) {
+ protected void removeEntryLog(long entryLogId) throws EntryLogMetadataMapException {
// remove entry log file successfully
if (entryLogger.removeEntryLog(entryLogId)) {
LOG.info("Removing entry log metadata for {}", entryLogId);
@@ -582,11 +618,9 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* Method to read in all of the entry logs (those that we haven't done so yet),
* and find the set of ledger ID's that make up each entry log file.
*
- * @param entryLogMetaMap
- * Existing EntryLogs to Meta
- * @throws IOException
+ * @throws EntryLogMetadataMapException
*/
- protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) {
+ protected void extractMetaFromEntryLogs() throws EntryLogMetadataMapException {
// Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
// to a new one. We scan entry logs as follows:
// - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
@@ -624,6 +658,9 @@ protected Map extractMetaFromEntryLogs(Map extractMetaFromEntryLogs(Map entryLogMetaMap = new ConcurrentHashMap();
+
+ @Override
+ public boolean containsKey(long entryLogId) {
+ return entryLogMetaMap.containsKey(entryLogId);
+ }
+
+ @Override
+ public void put(long entryLogId, EntryLogMetadata entryLogMeta) {
+ entryLogMetaMap.put(entryLogId, entryLogMeta);
+ }
+
+ @Override
+ public void forEach(BiConsumer action) {
+ entryLogMetaMap.forEach(action);
+ }
+
+ @Override
+ public void remove(long entryLogId) {
+ entryLogMetaMap.remove(entryLogId);
+ }
+
+ @Override
+ public int size() {
+ return entryLogMetaMap.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ entryLogMetaMap.clear();
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
new file mode 100644
index 00000000000..812ab845385
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.EntryLogMetadata.EntryLogMetadataRecyclable;
+import org.apache.bookkeeper.bookie.EntryLogMetadataMap;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Persistent entryLogMetadata-map that stores entry-loggers metadata into
+ * rocksDB.
+ */
+public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap {
+ private static final Logger LOG = LoggerFactory.getLogger(PersistentEntryLogMetadataMap.class);
+ // persistent Rocksdb to store metadata-map
+ private final KeyValueStorage metadataMapDB;
+ private AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ private static final FastThreadLocal baos = new FastThreadLocal() {
+ @Override
+ protected ByteArrayOutputStream initialValue() {
+ return new ByteArrayOutputStream();
+ }
+ };
+ private static final FastThreadLocal bais = new FastThreadLocal() {
+ @Override
+ protected ByteArrayInputStream initialValue() {
+ return new ByteArrayInputStream(new byte[1]);
+ }
+ };
+ private static final FastThreadLocal dataos = new FastThreadLocal() {
+ @Override
+ protected DataOutputStream initialValue() {
+ return new DataOutputStream(baos.get());
+ }
+ };
+ private static final FastThreadLocal datais = new FastThreadLocal() {
+ @Override
+ protected DataInputStream initialValue() {
+ return new DataInputStream(bais.get());
+ }
+ };
+
+ public PersistentEntryLogMetadataMap(String metadataPath, ServerConfiguration conf) throws IOException {
+ LOG.info("Loading persistent entrylog metadata-map from {}", metadataPath);
+ File dir = new File(metadataPath);
+ if (!dir.mkdirs() && !dir.exists()) {
+ String err = "Unable to create directory " + dir;
+ LOG.error(err);
+ throw new IOException(err);
+ }
+ metadataMapDB = KeyValueStorageRocksDB.factory.newKeyValueStorage(metadataPath, "metadata-cache",
+ DbConfigType.Small, conf);
+ }
+
+ @Override
+ public boolean containsKey(long entryLogId) throws EntryLogMetadataMapException {
+ LongWrapper key = LongWrapper.get(entryLogId);
+ try {
+ boolean isExist;
+ try {
+ isExist = metadataMapDB.get(key.array) != null;
+ } catch (IOException e) {
+ throw new EntryLogMetadataMapException(e);
+ }
+ return isExist;
+ } finally {
+ key.recycle();
+ }
+ }
+
+ @Override
+ public void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogMetadataMapException {
+ LongWrapper key = LongWrapper.get(entryLogId);
+ try {
+ baos.get().reset();
+ try {
+ entryLogMeta.serialize(dataos.get());
+ metadataMapDB.put(key.array, baos.get().toByteArray());
+ } catch (IllegalStateException | IOException e) {
+ LOG.error("Failed to serialize entrylog-metadata, entryLogId {}", entryLogId);
+ throw new EntryLogMetadataMapException(e);
+ }
+ } finally {
+ key.recycle();
+ }
+
+ }
+
+ /**
+ * {@link EntryLogMetadata} life-cycle in supplied action will be transient
+ * and it will be recycled as soon as supplied action is completed.
+ */
+ @Override
+ public void forEach(BiConsumer action) throws EntryLogMetadataMapException {
+ CloseableIterator> iterator = metadataMapDB.iterator();
+ try {
+ while (iterator.hasNext()) {
+ if (isClosed.get()) {
+ break;
+ }
+ Entry entry = iterator.next();
+ long entryLogId = ArrayUtil.getLong(entry.getKey(), 0);
+ ByteArrayInputStream localBais = bais.get();
+ DataInputStream localDatais = datais.get();
+ if (localBais.available() < entry.getValue().length) {
+ localBais.close();
+ localDatais.close();
+ ByteArrayInputStream newBais = new ByteArrayInputStream(entry.getValue());
+ bais.set(newBais);
+ datais.set(new DataInputStream(newBais));
+ } else {
+ localBais.read(entry.getValue(), 0, entry.getValue().length);
+ }
+ localBais.reset();
+ localDatais.reset();
+ EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get());
+ try {
+ action.accept(entryLogId, metadata);
+ } finally {
+ metadata.recycle();
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to iterate over entry-log metadata map {}", e.getMessage(), e);
+ throw new EntryLogMetadataMapException(e);
+ } finally {
+ try {
+ iterator.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close entry-log metadata-map rocksDB iterator {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void remove(long entryLogId) throws EntryLogMetadataMapException {
+ LongWrapper key = LongWrapper.get(entryLogId);
+ try {
+ try {
+ metadataMapDB.delete(key.array);
+ } catch (IOException e) {
+ throw new EntryLogMetadataMapException(e);
+ }
+ } finally {
+ key.recycle();
+ }
+ }
+
+ @Override
+ public int size() throws EntryLogMetadataMapException {
+ try {
+ return (int) metadataMapDB.count();
+ } catch (IOException e) {
+ throw new EntryLogMetadataMapException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed.set(true);
+ metadataMapDB.close();
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index d3547984f65..65c710164e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -24,8 +24,8 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN;
-import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;;
-import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;;
+import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;
+import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.Unpooled;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index ded57a3e027..7af77d501f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.conf;
+import static org.apache.bookkeeper.util.BookKeeperConstants.ENTRYLOG_INDEX_CACHE;
import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
import com.google.common.annotations.Beta;
@@ -115,6 +116,8 @@ public class ServerConfiguration extends AbstractConfiguration lastMinorCompactionTime);
@@ -837,6 +845,7 @@ public void testForceMajorCompaction() throws Exception {
LOG.info("Finished deleting the ledgers contains most entries.");
getGCThread().enableForceGC();
getGCThread().triggerGC().get();
+ assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap);
// after garbage collection, minor compaction should not be executed
assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
@@ -924,6 +933,8 @@ public void testCompactionPersistence() throws Exception {
* purpose.
*/
newBookieConf.setMetadataServiceUri(null);
+ String entryLogCachePath = newBookieConf.getGcEntryLogMetadataCachePath();
+ newBookieConf.setGcEntryLogMetadataCachePath(entryLogCachePath + "-bk2");
Bookie newbookie = new TestBookieImpl(newBookieConf);
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
@@ -1009,6 +1020,7 @@ public void testCompactionWhenLedgerDirsAreFull() throws Exception {
LOG.info("Finished deleting the ledgers contains most entries.");
getGCThread().enableForceGC();
getGCThread().triggerGC().get();
+ assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap);
// after garbage collection, minor compaction should not be executed
assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
@@ -1094,6 +1106,7 @@ public void testCompactionSmallEntryLogs() throws Exception {
getGCThread().enableForceGC();
getGCThread().triggerGC().get();
+ assertTrue(getGCThread().getEntryLogMetaMap() instanceof PersistentEntryLogMetadataMap);
// entry logs (0.log) should not be compacted
// entry logs ([1,2,3].log) should be compacted.
@@ -1716,9 +1729,13 @@ public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
super(gcThread.conf,
gcThread.entryLogger,
gcThread.ledgerStorage,
- (long entry) -> {
- gcThread.removeEntryLog(entry);
- });
+ (long entry) -> {
+ try {
+ gcThread.removeEntryLog(entry);
+ } catch (EntryLogMetadataMapException e) {
+ LOG.warn("Failed to remove entry-log metadata {}", entry, e);
+ }
+ });
}
synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) {
@@ -1810,4 +1827,4 @@ void start() throws IOException {
}
}
-}
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
new file mode 100644
index 00000000000..243fe692ec4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Unit test for {@link PersistentEntryLogMetadataMap}.
+ */
+public class PersistentEntryLogMetadataMapTest {
+
+ private final ServerConfiguration configuration;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public PersistentEntryLogMetadataMapTest() {
+ this.configuration = new ServerConfiguration();
+ }
+
+ /**
+ * Validates PersistentEntryLogMetadataMap functionalities.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void simple() throws Exception {
+ File tmpDir = tempFolder.newFolder("metadata-cache");
+ String path = tmpDir.getAbsolutePath().toString();
+ PersistentEntryLogMetadataMap entryMetadataMap = new PersistentEntryLogMetadataMap(path, configuration);
+
+ List metadatas = Lists.newArrayList();
+ int totalMetadata = 1000;
+ // insert entry-log-metadata records
+ for (int i = 1; i <= totalMetadata; i++) {
+ EntryLogMetadata entryLogMeta = createEntryLogMetadata(i, i);
+ metadatas.add(entryLogMeta);
+ entryMetadataMap.put(i, entryLogMeta);
+ }
+ for (int i = 1; i <= totalMetadata; i++) {
+ assertTrue(entryMetadataMap.containsKey(i));
+ }
+
+ assertEquals(entryMetadataMap.size(), totalMetadata);
+
+ entryMetadataMap.forEach((logId, metadata) -> {
+ assertEquals(metadatas.get(logId.intValue() - 1).getTotalSize(), metadata.getTotalSize());
+ for (int i = 0; i < logId.intValue(); i++) {
+ assertTrue(metadata.containsLedger(i));
+ }
+ });
+
+ // remove entry-log entry
+ for (int i = 1; i <= totalMetadata; i++) {
+ entryMetadataMap.remove(i);
+ }
+
+ // entries should not be present into map
+ for (int i = 1; i <= totalMetadata; i++) {
+ assertFalse(entryMetadataMap.containsKey(i));
+ }
+
+ assertEquals(entryMetadataMap.size(), 0);
+
+ entryMetadataMap.close();
+ }
+
+ /**
+ * Validates PersistentEntryLogMetadataMap persists metadata state in
+ * rocksDB.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void closeAndOpen() throws Exception {
+ File tmpDir = tempFolder.newFolder();
+ String path = tmpDir.getAbsolutePath().toString();
+ PersistentEntryLogMetadataMap entryMetadataMap = new PersistentEntryLogMetadataMap(path, configuration);
+
+ List metadatas = Lists.newArrayList();
+ int totalMetadata = 1000;
+ for (int i = 1; i <= totalMetadata; i++) {
+ EntryLogMetadata entryLogMeta = createEntryLogMetadata(i, i);
+ metadatas.add(entryLogMeta);
+ entryMetadataMap.put(i, entryLogMeta);
+ }
+ for (int i = 1; i <= totalMetadata; i++) {
+ assertTrue(entryMetadataMap.containsKey(i));
+ }
+
+ // close metadata-map
+ entryMetadataMap.close();
+ // Open it again
+ entryMetadataMap = new PersistentEntryLogMetadataMap(path, configuration);
+
+ entryMetadataMap.forEach((logId, metadata) -> {
+ assertEquals(metadatas.get(logId.intValue() - 1).getTotalSize(), logId.longValue());
+ for (int i = 0; i < logId.intValue(); i++) {
+ assertTrue(metadata.containsLedger(i));
+ }
+ });
+
+ entryMetadataMap.close();
+ }
+
+ private EntryLogMetadata createEntryLogMetadata(long logId, long totalLedgers) {
+ EntryLogMetadata metadata = new EntryLogMetadata(logId);
+ for (int i = 0; i < totalLedgers; i++) {
+ metadata.addLedgerSize(i, 1);
+ }
+ return metadata;
+ }
+}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 921ced01c91..801976ebbb3 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -591,6 +591,14 @@ ledgerDirectories=/tmp/bk-data
# True if the bookie should double check readMetadata prior to gc
# verifyMetadataOnGC=false
+# True if bookie should persist entrylog file metadata and avoid in-memory object allocation
+gcEntryLogMetadataCacheEnabled=false
+
+# Directory to persist Entrylog metadata if gcPersistentEntrylogMetadataMapEnabled is true
+# [Default: it creates a sub-directory under a first available base ledger directory with
+# name "entrylogIndexCache"]
+# gcEntryLogMetadataCachePath=
+
#############################################################################
## Disk utilization
#############################################################################