Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1007,19 +1007,6 @@ public static void flushDir(Path path) throws IOException {
}
}

/**
* Flushes dirty directories to guarantee crash consistency with swallowing {@link NoSuchFileException}
*
* @throws IOException if flushing the directory fails.
*/
public static void flushDirIfExists(Path path) throws IOException {
try {
flushDir(path);
} catch (NoSuchFileException e) {
log.warn("Failed to flush directory {}", path);
}
}

/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ class LocalLog(@volatile private var _dir: File,
segmentsToFlush.foreach(_.flush())
// If there are any new segments, we need to flush the parent directory for crash consistency.
if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) {
// The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
// Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
Utils.flushDirIfExists(dir.toPath)
Utils.flushDir(dir.toPath)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1648,8 +1648,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
debug(s"Flushing log up to offset $offset ($includingOffsetStr)" +
s"with recovery point $newRecoveryPoint, last flushed: $lastFlushTime, current time: ${time.milliseconds()}," +
s"unflushed: ${localLog.unflushedMessages}")
localLog.flush(flushOffset)
lock synchronized {
// Flushing under lock, as log directory can be concurrently renamed in LocalLog.renameDir
localLog.flush(flushOffset)
localLog.markFlushed(newRecoveryPoint)
}
}
Expand Down
16 changes: 0 additions & 16 deletions core/src/test/scala/unit/kafka/log/LocalLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{doReturn, spy}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -703,20 +701,6 @@ class LocalLogTest {
assertThrows(classOf[KafkaException], () => log.roll())
}

@Test
def testFlushingNonExistentDir(): Unit = {
val spyLog = spy(log)

val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
mockTime.sleep(1)
val newSegment = log.roll()

// simulate the directory is renamed concurrently
doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable)
}

private def createLocalLogWithActiveSegment(dir: File = logDir,
config: LogConfig,
segments: LogSegments = new LogSegments(topicPartition),
Expand Down