Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
import java.util.{Collections, Optional, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.junit.jupiter.api.function.Executable
Expand Down Expand Up @@ -965,7 +965,7 @@ class LogManagerTest {
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false),
producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = mockTime.scheduler,
scheduler = mock(classOf[Scheduler]),
time = mockTime,
brokerTopicStats = mockBrokerTopicStats,
logDirFailureChannel = mock(classOf[LogDirFailureChannel]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ public void truncateFromEndAsyncFlush(long endOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
List<EpochEntry> entries = new ArrayList<>(epochs.values());
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries));
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand Down Expand Up @@ -381,8 +380,7 @@ public void truncateFromStartAsyncFlush(long startOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
List<EpochEntry> entries = new ArrayList<>(epochs.values());
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries));
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);

EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1);
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
Expand Down Expand Up @@ -526,4 +524,13 @@ private void writeToFile() {
lock.readLock().unlock();
}
}

private void writeToFileForTruncation() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's clearer to rename this and checkpoint.writeForTruncation to sth like writeIfDirExists.

lock.readLock().lock();
try {
checkpoint.writeForTruncation(epochs.values());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment on why it's important to hold the lock while writing the checkpoint file?

} finally {
lock.readLock().unlock();
}
}
}