From ed8d82e22dd7aa5edabb2956ca5ec55a986a2ba1 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 28 Nov 2018 01:31:58 -0800 Subject: [PATCH 1/5] FileUtils: Sync directory entry too on writeAtomically. See the fsync(2) man page for why this is important: https://linux.die.net/man/2/fsync This also plumbs CompressionUtils's "zip" function through writeAtomically, so the code for handling atomic local filesystem writes is all done in the same place. --- .../java/util/common/CompressionUtils.java | 14 +++----- .../druid/java/util/common/FileUtils.java | 33 +++++++++++++++---- .../druid/java/util/common/FileUtilsTest.java | 10 ++++-- ...PollingBasicAuthenticatorCacheManager.java | 9 ++++- ...torPollingBasicAuthorizerCacheManager.java | 8 ++++- .../query/lookup/LookupSnapshotTaker.java | 8 ++++- 6 files changed, 61 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java index 79e010aa40cd..726f5bf430f4 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java @@ -78,16 +78,12 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory); } - try (final FileOutputStream out = new FileOutputStream(outputZipFile)) { - long bytes = zip(directory, out); - - // For explanation of why fsyncing here is a good practice: - // https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 - if (fsync) { - out.getChannel().force(true); + if (fsync) { + return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out)); + } else { + try (final FileOutputStream out = new FileOutputStream(outputZipFile)) { + return zip(directory, out); } - - return bytes; } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index 97ac6cb51ad9..8dbee4cdab35 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.ByteSource; import com.google.common.io.Files; +import org.apache.druid.java.util.common.logger.Logger; import java.io.File; import java.io.FileNotFoundException; @@ -34,6 +35,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -41,6 +43,8 @@ public class FileUtils { + private static final Logger log = new Logger(FileUtils.class); + /** * Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions */ @@ -182,21 +186,25 @@ public static MappedByteBufferHandler map(File file) throws IOException * * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. */ - public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException + public static T writeAtomically(final File file, OutputStreamConsumer f) throws IOException { - writeAtomically(file, file.getParentFile(), f); + return writeAtomically(file, file.getParentFile(), f); } - private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException + private static T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException { final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); try { + final T retVal; + try (final FileOutputStream out = new FileOutputStream(tmpFile)) { // Pass f an uncloseable stream so we can fsync before closing. - f.accept(uncloseable(out)); + retVal = f.apply(uncloseable(out)); + out.flush(); // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. + // See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 out.getChannel().force(true); } @@ -207,9 +215,20 @@ private static void writeAtomically(final File file, final File tmpDir, OutputSt StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING ); + + // fsync the directory entry to ensure the new file will be visible after a crash. + try (final FileChannel directory = FileChannel.open(file.getParentFile().toPath(), StandardOpenOption.READ)) { + directory.force(true); + } + + return retVal; } finally { - tmpFile.delete(); + if (tmpFile.exists()) { + if (!tmpFile.delete()) { + log.warn("Could not delete tmpFile[%s]", tmpFile); + } + } } } @@ -225,8 +244,8 @@ public void close() }; } - public interface OutputStreamConsumer + public interface OutputStreamConsumer { - void accept(OutputStream outputStream) throws IOException; + T apply(OutputStream outputStream) throws IOException; } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java index ca8432cec318..746453f59d1e 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java @@ -55,7 +55,10 @@ public void testWriteAtomically() throws IOException { final File tmpDir = folder.newFolder(); final File tmpFile = new File(tmpDir, "file1"); - FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo"))); + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("foo")); + return null; + }); Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); // Try writing again, throw error partway through. @@ -71,7 +74,10 @@ public void testWriteAtomically() throws IOException } Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); - FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz"))); + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("baz")); + return null; + }); Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); } } diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java index 035568feb547..2275d755478d 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java @@ -52,6 +52,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -236,7 +237,13 @@ private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOExc File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix)); - FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); + FileUtils.writeAtomically( + userMapFile, + out -> { + out.write(userMapBytes); + return null; + } + ); } private Map tryFetchUserMapFromCoordinator(String prefix) throws Exception diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java index c3115c3a24ba..29c3f572bc85 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java @@ -212,7 +212,13 @@ private void writeMapToDisk(String prefix, byte[] userMapBytes) throws IOExcepti File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix)); - FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); + FileUtils.writeAtomically( + userMapFile, + out -> { + out.write(userMapBytes); + return null; + } + ); } @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java index 6d60aafdedc1..b2f21329e411 100644 --- a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java @@ -88,7 +88,13 @@ public synchronized void takeSnapshot(String tier, List lookups) final File persistFile = getPersistFile(tier); try { - FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); + FileUtils.writeAtomically( + persistFile, + out -> { + objectMapper.writeValue(out, lookups); + return null; + } + ); } catch (IOException e) { throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath()); From 6cecb58524fb4a70826eac02d9075160c422aa20 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 28 Nov 2018 02:56:31 -0800 Subject: [PATCH 2/5] Remove unused import. --- .../cache/CoordinatorPollingBasicAuthenticatorCacheManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java index 2275d755478d..2641280ed9d9 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java @@ -52,7 +52,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.util.HashSet; import java.util.Map; import java.util.Set; From fb8e1ac9ba5770c9597318515db6d378e3c88e80 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 28 Nov 2018 12:14:33 -0800 Subject: [PATCH 3/5] Avoid FileOutputStream. --- .../java/util/common/CompressionUtils.java | 12 +++++++++- .../druid/java/util/common/FileUtils.java | 24 ++++++++++--------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java index 726f5bf430f4..6d253bc109ce 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java @@ -41,7 +41,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; import java.util.Enumeration; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -81,7 +84,14 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws if (fsync) { return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out)); } else { - try (final FileOutputStream out = new FileOutputStream(outputZipFile)) { + try ( + final FileChannel fileChannel = FileChannel.open( + outputZipFile.toPath(), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW + ); + final OutputStream out = Channels.newOutputStream(fileChannel) + ) { return zip(directory, out); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index 8dbee4cdab35..b4b15ae62a01 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -26,13 +26,14 @@ import com.google.common.io.Files; import org.apache.druid.java.util.common.logger.Logger; +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; @@ -195,17 +196,25 @@ private static T writeAtomically(final File file, final File tmpDir, OutputS { final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); - try { + //noinspection unused + try (final Closeable deleter = () -> java.nio.file.Files.deleteIfExists(tmpFile.toPath())) { final T retVal; - try (final FileOutputStream out = new FileOutputStream(tmpFile)) { + try ( + final FileChannel fileChannel = FileChannel.open( + tmpFile.toPath(), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW + ); + final OutputStream out = Channels.newOutputStream(fileChannel) + ) { // Pass f an uncloseable stream so we can fsync before closing. retVal = f.apply(uncloseable(out)); out.flush(); // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. // See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 - out.getChannel().force(true); + fileChannel.force(true); } // No exception thrown; do the move. @@ -223,13 +232,6 @@ private static T writeAtomically(final File file, final File tmpDir, OutputS return retVal; } - finally { - if (tmpFile.exists()) { - if (!tmpFile.delete()) { - log.warn("Could not delete tmpFile[%s]", tmpFile); - } - } - } } private static OutputStream uncloseable(final OutputStream out) From 3cabd984b9651f44977b11893a0b1863378e739d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 4 Dec 2018 10:32:19 -0800 Subject: [PATCH 4/5] Allow non-atomic writes to overwrite. --- .../org/apache/druid/java/util/common/CompressionUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java index 6d253bc109ce..ce94a4a8c96f 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java @@ -88,7 +88,7 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws final FileChannel fileChannel = FileChannel.open( outputZipFile.toPath(), StandardOpenOption.WRITE, - StandardOpenOption.CREATE_NEW + StandardOpenOption.CREATE ); final OutputStream out = Channels.newOutputStream(fileChannel) ) { From 1a8447df33aa24748e8f59196c5555209e7acd1c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Dec 2018 10:15:31 -0800 Subject: [PATCH 5/5] Add some comments. And no need to flush an unbuffered stream. --- .../main/java/org/apache/druid/java/util/common/FileUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index b4b15ae62a01..1ba63c105991 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -210,9 +210,10 @@ private static T writeAtomically(final File file, final File tmpDir, OutputS ) { // Pass f an uncloseable stream so we can fsync before closing. retVal = f.apply(uncloseable(out)); - out.flush(); // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. + // Must do this before "out" or "fileChannel" is closed. No need to flush "out" first, since + // Channels.newOutputStream is unbuffered. // See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 fileChannel.force(true); }