Skip to content
Merged
46 changes: 31 additions & 15 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
Expand All @@ -61,7 +62,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
Expand All @@ -83,12 +84,16 @@
import scala.Option;
import scala.collection.JavaConverters;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -608,25 +613,23 @@ public boolean isCancelled() {
}

/**
* Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset
* Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset.
* <p>
* Visible for testing.
*
* @param log The actual log from where to take the leader-epoch checkpoint
* @param startOffset The start offset of the checkpoint file (exclusive in the truncation).
* @param startOffset The start offset of the epoch entries (inclusive).
* If start offset is 6, then it will retain an entry at offset 6.
* @param endOffset The end offset of the checkpoint file (inclusive in the truncation)
* @param endOffset The end offset of the epoch entries (exclusive)
* If end offset is 100, then it will remove the entries greater than or equal to 100.
* @return the truncated leader epoch checkpoint
* @return the leader epoch entries
*/
InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) {
InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint();
List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) {
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
}
cache.truncateFromEnd(endOffset);
return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset);
} else {
return Collections.emptyList();
}
return checkpoint;
}

class RLMTask extends CancellableRunnable {
Expand Down Expand Up @@ -784,7 +787,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
long endOffset = nextSegmentBaseOffset - 1;
File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);

List<EpochEntry> epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read();
List<EpochEntry> epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset);
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));

Expand All @@ -794,7 +797,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment

remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();

ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer();
ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset));
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()),
toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
Expand Down Expand Up @@ -1747,6 +1750,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}

//Visible for testing
static ByteBuffer epochEntriesAsByteBuffer(List<EpochEntry> epochEntries) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer =
new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
writeBuffer.write(epochEntries);
writer.flush();
}

return ByteBuffer.wrap(stream.toByteArray());
}

private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) {
String topic = topicIdPartition.topic();
if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ class LogLoader(
}
}

leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset))
leaderEpochCache.ifPresent(_.truncateFromEndAsyncFlush(nextOffset))
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint))
leaderEpochCache.ifPresent(_.truncateFromStartAsyncFlush(logStartOffsetCheckpoint))

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
Expand Down
43 changes: 23 additions & 20 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler)
}

private def updateHighWatermarkWithLogEndOffset(): Unit = {
Expand Down Expand Up @@ -1015,7 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
leaderEpochCache.foreach(_.truncateFromStartAsyncFlush(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
Expand Down Expand Up @@ -1813,7 +1814,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(logEndOffset))
}

false
Expand All @@ -1826,7 +1827,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
val deletedSegments = localLog.truncateTo(targetOffset)
deleteProducerSnapshots(deletedSegments, asyncDelete = true)
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
if (highWatermark >= localLog.logEndOffset)
Expand Down Expand Up @@ -2011,12 +2012,17 @@ object UnifiedLog extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
// The created leaderEpochCache will be truncated by LogLoader if necessary
// so it is guaranteed that the epoch entries will be correct even when on-disk
// checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
None,
scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
Expand Down Expand Up @@ -2103,7 +2109,8 @@ object UnifiedLog extends Logging {
}

/**
* If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
* If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance.
* Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty.
* Otherwise, the message format is considered incompatible and the existing LeaderEpoch file
* is deleted.
*
Expand All @@ -2112,33 +2119,29 @@ object UnifiedLog extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param logPrefix The logging prefix
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion,
logPrefix: String): Option[LeaderEpochFileCache] = {
logPrefix: String,
currentCache: Option[LeaderEpochFileCache],
scheduler: Scheduler): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
val currentCache = if (leaderEpochFile.exists())
Some(newLeaderEpochFileCache())
else
None

if (currentCache.exists(_.nonEmpty))
if (leaderEpochFile.exists()) {
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")

}
Files.deleteIfExists(leaderEpochFile.toPath)
None
} else {
Some(newLeaderEpochFileCache())
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile))
.orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list, true)
checkpoint.write(list)
}

def read(): Map[TopicPartition, Long] = {
Expand Down
Loading