From 62080a08ea37bb676587b078c1eb2af8ad497432 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 5 Feb 2018 00:27:22 -0800 Subject: [PATCH 1/2] Fix races in LookupSnapshotTaker, CoordinatorPollingBasicAuthenticatorCacheManager. Both were susceptible to the following conditions: 1. Two JVMs on the same machine (perhaps two peons) could conflict by one reading while the other was writing, or by writing to the file at the same time. 2. One JVM could partially write a file, then crash, leaving a truncated file. --- ...PollingBasicAuthenticatorCacheManager.java | 4 +- .../io/druid/java/util/common/FileUtils.java | 71 ++++++++++++++++++- .../druid/java/util/common/FileUtilsTest.java | 26 +++++++ .../query/lookup/LookupSnapshotTaker.java | 10 ++- .../query/lookup/LookupSnapshotTakerTest.java | 12 +++- 5 files changed, 114 insertions(+), 9 deletions(-) diff --git a/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java index c7d6cad4d0f8..fe5dd7e679a2 100644 --- a/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.io.Files; import com.google.inject.Inject; import com.google.inject.Injector; import io.druid.client.coordinator.Coordinator; @@ -29,6 +28,7 @@ import io.druid.discovery.DruidLeaderClient; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; @@ -236,7 +236,7 @@ private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOExc File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix)); - Files.write(userMapBytes, userMapFile); + FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); } private Map tryFetchUserMapFromCoordinator(String prefix) throws Exception diff --git a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java index 72be3a57e18f..ee5a2a25aa6a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java @@ -27,11 +27,16 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Collection; +import java.util.UUID; public class FileUtils { @@ -46,6 +51,7 @@ public boolean apply(Throwable input) return input instanceof Exception; } }; + /** * Copy input byte source to outFile. If outFile exists, it is attempted to be deleted. * @@ -150,10 +156,11 @@ public void addFile(File file) * }} * * @param file the file to map + * * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file} - * @throws FileNotFoundException if the {@code file} does not exist - * @throws IOException if an I/O error occurs * + * @throws FileNotFoundException if the {@code file} does not exist + * @throws IOException if an I/O error occurs * @see FileChannel#map(FileChannel.MapMode, long, long) */ public static MappedByteBufferHandler map(File file) throws IOException @@ -161,4 +168,64 @@ public static MappedByteBufferHandler map(File file) throws IOException MappedByteBuffer mappedByteBuffer = Files.map(file); return new MappedByteBufferHandler(mappedByteBuffer); } + + /** + * Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to + * the target location. This function attempts to clean up its temporary files when possible, but they may stick + * around (for example, if the JVM crashes partway through executing the function). In any case, the target file + * should be unharmed. + * + * The OutputStream passed to the consumer is uncloseable; calling close on it will do nothing. This is to ensure + * that the stream stays open so we can fsync it here before closing. Hopefully, this doesn't cause any problems + * for callers. + * + * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. + */ + public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException + { + writeAtomically(file, file.getParentFile(), f); + } + + private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException + { + final File tmpFile = new File(tmpDir, String.format(".%s.%s", file.getName(), UUID.randomUUID())); + + try { + try (final FileOutputStream out = new FileOutputStream(tmpFile)) { + // Pass f an uncloseable stream so we can fsync before closing. + f.accept(uncloseable(out)); + + // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. + out.getChannel().force(true); + } + + // No exception thrown; do the move. + java.nio.file.Files.move( + tmpFile.toPath(), + file.toPath(), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + ); + } + finally { + tmpFile.delete(); + } + } + + private static OutputStream uncloseable(final OutputStream out) throws IOException + { + return new FilterOutputStream(out) + { + @Override + public void close() throws IOException + { + // Do nothing. + } + }; + } + + public interface OutputStreamConsumer + { + void accept(OutputStream outputStream) throws IOException; + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java b/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java index a9594f7a02a1..3e4aea4eca8e 100644 --- a/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Files; public class FileUtilsTest { @@ -48,4 +49,29 @@ public void testMap() throws IOException long buffersMemoryAfter = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter); } + + @Test + public void testWriteAtomically() throws IOException + { + final File tmpDir = folder.newFolder(); + final File tmpFile = new File(tmpDir, "file1"); + FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo"))); + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + + // Try writing again, throw error partway through. + try { + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("bar")); + out.flush(); + throw new ISE("OMG!"); + }); + } + catch (IllegalStateException e) { + // Suppress + } + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + + FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz"))); + Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + } } diff --git a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java index 3d67124610ff..9162bf4b2035 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java +++ b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import io.druid.guice.annotations.Json; +import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; @@ -50,7 +51,10 @@ public LookupSnapshotTaker( ) { this.objectMapper = jsonMapper; - Preconditions.checkArgument(!Strings.isNullOrEmpty(persistDirectory), "can not work without specifying persistDirectory"); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(persistDirectory), + "can not work without specifying persistDirectory" + ); this.persistDirectory = new File(persistDirectory); if (!this.persistDirectory.exists()) { Preconditions.checkArgument(this.persistDirectory.mkdirs(), "Oups was not able to create persist directory"); @@ -72,7 +76,7 @@ public synchronized List pullExistingSnapshot() LOGGER.warn("found empty file no lookups to load from [%s]", persistFile.getAbsolutePath()); return Collections.emptyList(); } - lookupBeanList = objectMapper.readValue(persistFile, new TypeReference>(){}); + lookupBeanList = objectMapper.readValue(persistFile, new TypeReference>() {}); return lookupBeanList; } catch (IOException e) { @@ -83,7 +87,7 @@ public synchronized List pullExistingSnapshot() public synchronized void takeSnapshot(List lookups) { try { - objectMapper.writeValue(persistFile, lookups); + FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); } catch (IOException e) { throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath()); diff --git a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java index 3fc085623ffa..ac63bbdf01e3 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java @@ -31,6 +31,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.File; @@ -43,6 +44,10 @@ public class LookupSnapshotTakerTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -112,7 +117,7 @@ public void testBackwardCompatibility() throws IOException Assert.assertEquals(lookupBeanList, actualList); } - @Test(expected = ISE.class) + @Test public void testIOExceptionDuringLookupPersist() throws IOException { File directory = temporaryFolder.newFolder(); @@ -120,6 +125,7 @@ public void testIOExceptionDuringLookupPersist() throws IOException Assert.assertFalse(snapshotFile.exists()); Assert.assertTrue(snapshotFile.createNewFile()); Assert.assertTrue(snapshotFile.setReadOnly()); + Assert.assertTrue(snapshotFile.getParentFile().setReadOnly()); LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()); LookupBean lookupBean = new LookupBean( "name", @@ -135,10 +141,12 @@ public void testIOExceptionDuringLookupPersist() throws IOException ) ); List lookupBeanList = Lists.newArrayList(lookupBean); + + expectedException.expect(ISE.class); + expectedException.expectMessage("Exception during serialization of lookups"); lookupSnapshotTaker.takeSnapshot(lookupBeanList); } - @Test public void tesLookupPullingFromEmptyFile() throws IOException { From e07db23e1bdab46426e644f7e7dc6866ead1ca46 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 5 Feb 2018 09:03:23 -0800 Subject: [PATCH 2/2] Use StringUtils.format --- .../src/main/java/io/druid/java/util/common/FileUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java index ee5a2a25aa6a..0a5d08063881 100644 --- a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java @@ -188,7 +188,7 @@ public static void writeAtomically(final File file, OutputStreamConsumer f) thro private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException { - final File tmpFile = new File(tmpDir, String.format(".%s.%s", file.getName(), UUID.randomUUID())); + final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); try { try (final FileOutputStream out = new FileOutputStream(tmpFile)) {