diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index 731a84d3c9..e7301eacf8 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -128,7 +128,11 @@ public static String readOffsetFile(File storagePartitionDir, String offsetFileN if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); - offset = FileUtil.readWithChecksum(offsetFileRef); + try { + offset = FileUtil.readWithChecksum(offsetFileRef); + } catch (Exception e) { + LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e); + } } else { LOG.info("No offset file found in storage partition directory: {}", storePath); } diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 46a20894f6..0845b5cea2 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -22,6 +22,7 @@ package org.apache.samza.util import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.file._ import java.util.zip.CRC32 import org.apache.samza.util.Util.info @@ -35,10 +36,12 @@ object FileUtil { * */ def writeWithChecksum(file: File, data: String): Unit = { val checksum = getChecksum(data) + val tmpFilePath = file.getAbsolutePath + ".tmp" + val tmpFile = new File(tmpFilePath) var oos: ObjectOutputStream = null var fos: FileOutputStream = null try { - fos = new FileOutputStream(file) + fos = new FileOutputStream(tmpFile) oos = new ObjectOutputStream(fos) oos.writeLong(checksum) oos.writeUTF(data) @@ -46,6 +49,14 @@ object FileUtil { oos.close() fos.close() } + + //atomic swap of tmp and real offset file + try { + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) + } catch { + case e: AtomicMoveNotSupportedException => + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING) + } } /** diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala index 5bb6da741b..ddda2681ed 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala @@ -48,6 +48,28 @@ class TestFileUtil { fis.close() } + @Test + def testWriteDataToFileWithExistingOffsetFile() { + // Invoke test + val file = new File(System.getProperty("java.io.tmpdir"), "test2") + // write the same file three times + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + + // Check that file exists + assertTrue("File was not created!", file.exists()) + val fis = new FileInputStream(file) + val ois = new ObjectInputStream(fis) + + // Check content of the file is as expected + assertEquals(checksum, ois.readLong()) + assertEquals(data, ois.readUTF()) + ois.close() + fis.close() + } + + @Test def testReadDataFromFile() { // Setup diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index f25097c455..836dab408d 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -91,7 +91,14 @@ object RocksDbKeyValueStore extends Logging { .toSet (configuredMetrics ++ rocksDbMetrics) - .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) + .foreach(property => metrics.newGauge(property, () => + // Check isOwningHandle flag. The db is open iff the flag is true. + if (rocksDb.isOwningHandle) { + rocksDb.getProperty(property) + } else { + "0" + } + )) rocksDb } catch {