Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Enumeration;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand Down Expand Up @@ -78,16 +81,19 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}

try (final FileOutputStream out = new FileOutputStream(outputZipFile)) {
long bytes = zip(directory, out);

// For explanation of why fsyncing here is a good practice:
// https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
if (fsync) {
out.getChannel().force(true);
if (fsync) {
return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out));
} else {
try (
final FileChannel fileChannel = FileChannel.open(
outputZipFile.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE
);
final OutputStream out = Channels.newOutputStream(fileChannel)
) {
return zip(directory, out);
}

return bytes;
}
}

Expand Down
48 changes: 35 additions & 13 deletions core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,28 @@
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;

public class FileUtils
{
private static final Logger log = new Logger(FileUtils.class);

/**
* Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions
*/
Expand Down Expand Up @@ -182,22 +187,35 @@ public static MappedByteBufferHandler map(File file) throws IOException
*
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
*/
public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException
public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
{
writeAtomically(file, file.getParentFile(), f);
return writeAtomically(file, file.getParentFile(), f);
}

private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException
private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
{
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));

try {
try (final FileOutputStream out = new FileOutputStream(tmpFile)) {
//noinspection unused
try (final Closeable deleter = () -> java.nio.file.Files.deleteIfExists(tmpFile.toPath())) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now IOExceptions that could be thrown from deleteIfExists() are propagated

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine. It would be a shock to have it fail, and maybe it's best to propagate in that case.

final T retVal;

try (
final FileChannel fileChannel = FileChannel.open(
tmpFile.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW
);
final OutputStream out = Channels.newOutputStream(fileChannel)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a bit clearer to factor this as a nested try-with-resource block and omit out.flush()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fileChannel.force (fsync) has to be done while the stream is still open, so we can't close the OutputStream before that happens. So that's why the flush was first. I just pushed a change with some more comments describing what's going on.

But, I also removed the out.flush completely in this commit -- Channels.newOutputStream is specced as unbuffered, so a flush isn't necessary.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, thanks for researching that.

) {
// Pass f an uncloseable stream so we can fsync before closing.
f.accept(uncloseable(out));
retVal = f.apply(uncloseable(out));

// fsync to avoid write-then-rename-then-crash causing empty files on some filesystems.
out.getChannel().force(true);
// Must do this before "out" or "fileChannel" is closed. No need to flush "out" first, since
// Channels.newOutputStream is unbuffered.
// See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
fileChannel.force(true);
}

// No exception thrown; do the move.
Expand All @@ -207,9 +225,13 @@ private static void writeAtomically(final File file, final File tmpDir, OutputSt
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING
);
}
finally {
tmpFile.delete();

// fsync the directory entry to ensure the new file will be visible after a crash.
try (final FileChannel directory = FileChannel.open(file.getParentFile().toPath(), StandardOpenOption.READ)) {
directory.force(true);
}

return retVal;
}
}

Expand All @@ -225,8 +247,8 @@ public void close()
};
}

public interface OutputStreamConsumer
public interface OutputStreamConsumer<T>
{
void accept(OutputStream outputStream) throws IOException;
T apply(OutputStream outputStream) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public void testWriteAtomically() throws IOException
{
final File tmpDir = folder.newFolder();
final File tmpFile = new File(tmpDir, "file1");
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo")));
FileUtils.writeAtomically(tmpFile, out -> {
out.write(StringUtils.toUtf8("foo"));
return null;
});
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));

// Try writing again, throw error partway through.
Expand All @@ -71,7 +74,10 @@ public void testWriteAtomically() throws IOException
}
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));

FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz")));
FileUtils.writeAtomically(tmpFile, out -> {
out.write(StringUtils.toUtf8("baz"));
return null;
});
Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,13 @@ private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOExc
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix));
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
FileUtils.writeAtomically(
userMapFile,
out -> {
out.write(userMapBytes);
return null;
}
);
}

private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,13 @@ private void writeMapToDisk(String prefix, byte[] userMapBytes) throws IOExcepti
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix));
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
FileUtils.writeAtomically(
userMapFile,
out -> {
out.write(userMapBytes);
return null;
}
);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ public synchronized void takeSnapshot(String tier, List<LookupBean> lookups)
final File persistFile = getPersistFile(tier);

try {
FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups));
FileUtils.writeAtomically(
persistFile,
out -> {
objectMapper.writeValue(out, lookups);
return null;
}
);
}
catch (IOException e) {
throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());
Expand Down