From 27e24e8238b39f0cf927d34fbada8f6e8d22e2c0 Mon Sep 17 00:00:00 2001 From: xinyuiscool Date: Thu, 26 Jul 2018 11:14:45 -0700 Subject: [PATCH 1/5] SAMZA-1768: Handle corrupted OFFSET file elegantly --- .../org/apache/samza/storage/StorageManagerUtil.java | 6 +++++- .../src/main/scala/org/apache/samza/util/FileUtil.scala | 9 ++++++++- .../apache/samza/storage/kv/RocksDbKeyValueStore.scala | 8 +++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index 731a84d3c9..2a6fd8550e 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -128,7 +128,11 @@ public static String readOffsetFile(File storagePartitionDir, String offsetFileN if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); - offset = FileUtil.readWithChecksum(offsetFileRef); + try { + offset = FileUtil.readWithChecksum(offsetFileRef); + } catch (Exception e) { + LOG.warn("Fail to read offset file of " + storePath); + } } else { LOG.info("No offset file found in storage partition directory: {}", storePath); } diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 46a20894f6..361279b8b7 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -22,6 +22,7 @@ package org.apache.samza.util import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.file.{StandardCopyOption, CopyOption, Path, Files} import java.util.zip.CRC32 import org.apache.samza.util.Util.info @@ -35,10 +36,13 @@ object FileUtil { * */ def writeWithChecksum(file: File, data: String): Unit = { val checksum = getChecksum(data) + + val tmpFilePath = file.getAbsolutePath + ".tmp" + val tmpFile = new File(tmpFilePath) var oos: ObjectOutputStream = null var fos: FileOutputStream = null try { - fos = new FileOutputStream(file) + fos = new FileOutputStream(tmpFile) oos = new ObjectOutputStream(fos) oos.writeLong(checksum) oos.writeUTF(data) @@ -46,6 +50,9 @@ object FileUtil { oos.close() fos.close() } + + //atomic swap of tmp and real offset file + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) } /** diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index f25097c455..3772dda872 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -91,7 +91,13 @@ object RocksDbKeyValueStore extends Logging { .toSet (configuredMetrics ++ rocksDbMetrics) - .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) + .foreach(property => metrics.newGauge(property, () => + if (rocksDb.isOwningHandle) { + rocksDb.getProperty(property) + } else { + "0" + } + )) rocksDb } catch { From 853f48ab9909fcad410f651d03f2f98507e6222d Mon Sep 17 00:00:00 2001 From: xinyuiscool Date: Thu, 26 Jul 2018 11:31:50 -0700 Subject: [PATCH 2/5] Minor changes --- .../main/java/org/apache/samza/storage/StorageManagerUtil.java | 2 +- samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index 2a6fd8550e..46c8e489a2 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -131,7 +131,7 @@ public static String readOffsetFile(File storagePartitionDir, String offsetFileN try { offset = FileUtil.readWithChecksum(offsetFileRef); } catch (Exception e) { - LOG.warn("Fail to read offset file of " + storePath); + LOG.warn("Fail to read offset file of " + storePath, e); } } else { LOG.info("No offset file found in storage partition directory: {}", storePath); diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 361279b8b7..b317193177 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -36,7 +36,6 @@ object FileUtil { * */ def writeWithChecksum(file: File, data: String): Unit = { val checksum = getChecksum(data) - val tmpFilePath = file.getAbsolutePath + ".tmp" val tmpFile = new File(tmpFilePath) var oos: ObjectOutputStream = null From 8bbf69d31ee373353c88833e9d65f8f98e6ff1f1 Mon Sep 17 00:00:00 2001 From: xinyuiscool Date: Fri, 27 Jul 2018 15:38:42 -0700 Subject: [PATCH 3/5] Address pr comments --- .../java/org/apache/samza/storage/StorageManagerUtil.java | 2 +- .../src/main/scala/org/apache/samza/util/FileUtil.scala | 6 +++--- .../org/apache/samza/storage/kv/RocksDbKeyValueStore.scala | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index 46c8e489a2..e7301eacf8 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -131,7 +131,7 @@ public static String readOffsetFile(File storagePartitionDir, String offsetFileN try { offset = FileUtil.readWithChecksum(offsetFileRef); } catch (Exception e) { - LOG.warn("Fail to read offset file of " + storePath, e); + LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e); } } else { LOG.info("No offset file found in storage partition directory: {}", storePath); diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index b317193177..ebc8332334 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -45,13 +45,13 @@ object FileUtil { oos = new ObjectOutputStream(fos) oos.writeLong(checksum) oos.writeUTF(data) + + //atomic swap of tmp and real offset file + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE) } finally { oos.close() fos.close() } - - //atomic swap of tmp and real offset file - Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) } /** diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 3772dda872..836dab408d 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -92,6 +92,7 @@ object RocksDbKeyValueStore extends Logging { (configuredMetrics ++ rocksDbMetrics) .foreach(property => metrics.newGauge(property, () => + // Check isOwningHandle flag. The db is open iff the flag is true. if (rocksDb.isOwningHandle) { rocksDb.getProperty(property) } else { From c068fcec6277b1d0c5c2bf90bee61aa722731526 Mon Sep 17 00:00:00 2001 From: xinyuiscool Date: Fri, 27 Jul 2018 17:51:05 -0700 Subject: [PATCH 4/5] Further tweak of logic --- .../src/main/scala/org/apache/samza/util/FileUtil.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index ebc8332334..084976af47 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -22,7 +22,7 @@ package org.apache.samza.util import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} -import java.nio.file.{StandardCopyOption, CopyOption, Path, Files} +import java.nio.file._ import java.util.zip.CRC32 import org.apache.samza.util.Util.info @@ -47,7 +47,12 @@ object FileUtil { oos.writeUTF(data) //atomic swap of tmp and real offset file - Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE) + try { + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) + } catch { + case e: AtomicMoveNotSupportedException => + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING) + } } finally { oos.close() fos.close() From ee1cf4af2bb39eeab4713ae4f63d6401a6d50955 Mon Sep 17 00:00:00 2001 From: xinyuiscool Date: Mon, 30 Jul 2018 09:17:35 -0700 Subject: [PATCH 5/5] Add unit test for case of offset file aleady exists --- .../org/apache/samza/util/FileUtil.scala | 16 +++++++------- .../org/apache/samza/util/TestFileUtil.scala | 22 +++++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 084976af47..0845b5cea2 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -45,18 +45,18 @@ object FileUtil { oos = new ObjectOutputStream(fos) oos.writeLong(checksum) oos.writeUTF(data) - - //atomic swap of tmp and real offset file - try { - Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) - } catch { - case e: AtomicMoveNotSupportedException => - Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING) - } } finally { oos.close() fos.close() } + + //atomic swap of tmp and real offset file + try { + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) + } catch { + case e: AtomicMoveNotSupportedException => + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING) + } } /** diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala index 5bb6da741b..ddda2681ed 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala @@ -48,6 +48,28 @@ class TestFileUtil { fis.close() } + @Test + def testWriteDataToFileWithExistingOffsetFile() { + // Invoke test + val file = new File(System.getProperty("java.io.tmpdir"), "test2") + // write the same file three times + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + + // Check that file exists + assertTrue("File was not created!", file.exists()) + val fis = new FileInputStream(file) + val ois = new ObjectInputStream(fis) + + // Check content of the file is as expected + assertEquals(checksum, ois.readLong()) + assertEquals(data, ois.readUTF()) + ois.close() + fis.close() + } + + @Test def testReadDataFromFile() { // Setup