diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 6a0913d3c2da1..6e1c3cefbb1d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1021,6 +1021,17 @@ public static void flushDirIfExists(Path path) throws IOException { } } + /** + * Flushes dirty file with swallowing {@link NoSuchFileException} + */ + public static void flushFileIfExists(Path path) throws IOException { + try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { + fileChannel.force(true); + } catch (NoSuchFileException e) { + log.warn("Failed to flush file {}", path, e); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. @@ -1543,7 +1554,7 @@ public static Iterator covariantCast(Iterator iterator) { * Checks if a string is null, empty or whitespace only. * @param str a string to be checked * @return true if the string is null, empty or whitespace only; otherwise, return false. - */ + */ public static boolean isBlank(String str) { return str == null || str.trim().isEmpty(); } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0691cff51d0bc..8ca58ad20f0ce 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -44,7 +44,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} import java.io.{File, IOException} -import java.nio.file.Files +import java.nio.file.{Files, Path} import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.stream.Collectors @@ -1656,10 +1656,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + // We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here + // which could block subsequent produces in the meantime. + // flush is done in the scheduler thread along with segment flushing below + val maybeSnapshot = producerStateManager.takeSnapshot(false) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment - scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) + scheduler.scheduleOnce("flush-log", () => { + maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath)) + flushUptoOffsetExclusive(newSegment.baseOffset) + }) newSegment } @@ -1742,6 +1748,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, producerStateManager.mapEndOffset } + private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = { + maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") { + Utils.flushFileIfExists(snapshot) + } + } + /** * Truncate this log so that it ends with the greatest offset < targetOffset. * diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 084e46c5ef266..de3283d21fd42 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh def write(offsets: Map[TopicPartition, Long]): Unit = { val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size) offsets.foreach(x => list.add(x)) - checkpoint.write(list) + checkpoint.write(list, true) } def read(): Map[TopicPartition, Long] = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 0e2ac1b8f06ee..de1245f55699d 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -184,7 +184,7 @@ public class RemoteLogManagerTest { List epochs = Collections.emptyList(); @Override - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index aacab5b624ef5..11fff517b430d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -404,7 +404,7 @@ class LogSegmentTest { val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs = Seq.empty[EpochEntry] - override def write(epochs: util.Collection[EpochEntry]): Unit = { + override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = { this.epochs = epochs.asScala.toSeq } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 9f9acaf33c70b..1a4ffaa57d05e 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -42,8 +42,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.anyLong -import org.mockito.Mockito.{mock, when} +import org.mockito.ArgumentMatchers.{any, anyLong} +import org.mockito.Mockito.{doThrow, mock, spy, when} import java.io._ import java.nio.ByteBuffer @@ -3625,7 +3625,7 @@ class UnifiedLogTest { val records = TestUtils.singletonRecords(value = s"test$i".getBytes) log.appendAsLeader(records, leaderEpoch = 0) } - + log.updateHighWatermark(90L) log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(20, log.logStartOffset) @@ -3911,6 +3911,24 @@ class UnifiedLogTest { log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard) } + @Test + def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + val log = spy(createLog(logDir, logConfig)) + + doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any()) + + log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) + try { + log.roll(Some(1L)) + } catch { + case _: KafkaStorageException => // ignore + } + + // check that the recovery point isn't incremented + assertEquals(0L, log.recoveryPoint) + } + @Test def testDeletableSegmentsFilter(): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index ddbf58d884e30..a7e370d7f4091 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(10) val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1, OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) - checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L)) + checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true) assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index a2de160a0ed52..a47c902024a9f 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq + override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq override def read(): java.util.List[EpochEntry] = this.epochs.asJava } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 6efbaa136e0e9..9c115881328a0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -72,7 +72,7 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } - public void write(Collection entries) throws IOException { + public void write(Collection entries, boolean sync) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); @@ -80,10 +80,12 @@ public void write(Collection entries) throws IOException { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); - fileOutputStream.getFD().sync(); + if (sync) { + fileOutputStream.getFD().sync(); + } } - Utils.atomicMoveWithFallback(tempPath, absolutePath); + Utils.atomicMoveWithFallback(tempPath, absolutePath, sync); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java index 1eddc0b788b16..a08e0f30507bf 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java @@ -68,7 +68,7 @@ public Optional> fromString(String line) { } public synchronized void writeEntries(Map committedOffsets) throws IOException { - checkpointFile.write(committedOffsets.entrySet()); + checkpointFile.write(committedOffsets.entrySet(), true); } public synchronized Map readEntries() throws IOException { @@ -83,4 +83,4 @@ public synchronized Map readEntries() throws IOException { return partitionToOffsets; } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index f780ced9b0477..35abfb5a984d2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E checkpointFile = new CheckpointFile<>(file, version, formatter); } - public void write(Collection entries) { + public void write(Collection entries, boolean sync) { try { - checkpointFile.write(entries); + checkpointFile.write(entries, sync); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java index 3ef30b2502a8b..386712b330fbd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java @@ -42,7 +42,7 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { private List epochs = Collections.emptyList(); - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException { return ByteBuffer.wrap(stream.toByteArray()); } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java index 8cf5519512622..28ffae03df0e1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java @@ -22,8 +22,13 @@ import java.util.List; public interface LeaderEpochCheckpoint { + // in file-backed checkpoint implementation, the content should be + // synced to the device if `sync` is true + void write(Collection epochs, boolean sync); - void write(Collection epochs); + default void write(Collection epochs) { + write(epochs, true); + } List read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 81527c6377a28..3472182aeea1e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh } public void write(Collection epochs) { - checkpoint.write(epochs); + write(epochs, true); + } + + public void write(Collection epochs, boolean sync) { + checkpoint.write(epochs, sync); } public List read() { @@ -75,4 +79,4 @@ public Optional fromString(String line) { return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty(); } } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index a2c4ae7e0fcc5..03df6cc0dceb9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -73,7 +73,7 @@ public void assign(int epoch, long startOffset) { EpochEntry entry = new EpochEntry(epoch, startOffset); if (assign(entry)) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); - flush(); + writeToFile(true); } } @@ -83,7 +83,7 @@ public void assign(List entries) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); } }); - if (!entries.isEmpty()) flush(); + if (!entries.isEmpty()) writeToFile(true); } private boolean isUpdateNeeded(EpochEntry entry) { @@ -152,11 +152,6 @@ private List removeWhileMatching(Iterator= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure so it won't be a problem + writeToFile(false); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -345,7 +347,14 @@ public void truncateFromStart(long startOffset) { EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by + // another truncateFromStart call on log loading procedure so it won't be a problem + writeToFile(false); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -394,7 +403,7 @@ public void clearAndFlush() { lock.writeLock().lock(); try { epochs.clear(); - flush(); + writeToFile(true); } finally { lock.writeLock().unlock(); } @@ -431,16 +440,12 @@ public NavigableMap epochWithOffsets() { } } - private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { + private void writeToFile(boolean sync) { lock.readLock().lock(); try { - leaderEpochCheckpoint.write(epochs.values()); + checkpoint.write(epochs.values(), sync); } finally { lock.readLock().unlock(); } } - - private void flush() { - flushTo(this.checkpoint); - } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index d3e48ef7057b1..6bcafd2d60763 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -462,14 +462,21 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { + takeSnapshot(true); + } + + /** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ + public Optional takeSnapshot(boolean sync) throws IOException { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset)); long start = time.hiResClockMs(); - writeSnapshot(snapshotFile.file(), producers); + writeSnapshot(snapshotFile.file(), producers, sync); log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset, producers.size(), time.hiResClockMs() - start); @@ -477,7 +484,10 @@ public void takeSnapshot() throws IOException { // Update the last snap offset according to the serialized map lastSnapOffset = lastMapOffset; + + return Optional.of(snapshotFile.file()); } + return Optional.empty(); } /** @@ -635,7 +645,7 @@ public Optional removeAndMarkSnapshotForDeletion(long snapshotOffs // deletion, so ignoring the exception here just means that the intended operation was // already completed. try { - snapshotFile.renameTo(LogFileUtils.DELETED_FILE_SUFFIX); + snapshotFile.renameToDelete(); return Optional.of(snapshotFile); } catch (NoSuchFileException ex) { log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", snapshotFile.file().getAbsoluteFile()); @@ -684,7 +694,7 @@ public static List readSnapshot(File file) throws IOExceptio } } - private static void writeSnapshot(File file, Map entries) throws IOException { + private static void writeSnapshot(File file, Map entries, boolean sync) throws IOException { Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries @@ -716,7 +726,9 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); - fileChannel.force(true); + if (sync) { + fileChannel.force(true); + } } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java index be496ab299878..61fdae79950db 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java @@ -60,10 +60,10 @@ public File file() { return file; } - public void renameTo(String newSuffix) throws IOException { - File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); + public void renameToDelete() throws IOException { + File renamed = new File(Utils.replaceSuffix(file.getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)); try { - Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath()); + Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); } finally { file = renamed; } @@ -76,4 +76,4 @@ public String toString() { ", file=" + file + ')'; } -} \ No newline at end of file +}