Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.google.inject.Injector;
import io.druid.client.coordinator.Coordinator;
import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DruidLeaderClient;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -236,7 +236,7 @@ private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOExc
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix));
Files.write(userMapBytes, userMapFile);
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
}

private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception
Expand Down
71 changes: 69 additions & 2 deletions java-util/src/main/java/io/druid/java/util/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;

public class FileUtils
{
Expand All @@ -46,6 +51,7 @@ public boolean apply(Throwable input)
return input instanceof Exception;
}
};

/**
* Copy input byte source to outFile. If outFile exists, it is attempted to be deleted.
*
Expand Down Expand Up @@ -150,15 +156,76 @@ public void addFile(File file)
* }}</pre>
*
* @param file the file to map
*
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
* @throws FileNotFoundException if the {@code file} does not exist
* @throws IOException if an I/O error occurs
*
* @throws FileNotFoundException if the {@code file} does not exist
* @throws IOException if an I/O error occurs
* @see FileChannel#map(FileChannel.MapMode, long, long)
*/
public static MappedByteBufferHandler map(File file) throws IOException
{
MappedByteBuffer mappedByteBuffer = Files.map(file);
return new MappedByteBufferHandler(mappedByteBuffer);
}

/**
* Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to
* the target location. This function attempts to clean up its temporary files when possible, but they may stick
* around (for example, if the JVM crashes partway through executing the function). In any case, the target file
* should be unharmed.
*
* The OutputStream passed to the consumer is uncloseable; calling close on it will do nothing. This is to ensure
* that the stream stays open so we can fsync it here before closing. Hopefully, this doesn't cause any problems
* for callers.
*
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
*/
public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is no such utility already in Apache Commons IO, Guava, or JDK itself?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked (briefly) but did not find one.

{
writeAtomically(file, file.getParentFile(), f);
}

private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException
{
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));

try {
try (final FileOutputStream out = new FileOutputStream(tmpFile)) {
// Pass f an uncloseable stream so we can fsync before closing.
f.accept(uncloseable(out));

// fsync to avoid write-then-rename-then-crash causing empty files on some filesystems.
out.getChannel().force(true);
}

// No exception thrown; do the move.
java.nio.file.Files.move(
tmpFile.toPath(),
file.toPath(),
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING
);
}
finally {
tmpFile.delete();
}
}

private static OutputStream uncloseable(final OutputStream out) throws IOException
{
return new FilterOutputStream(out)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In unit tests locally with 15mb files, using a BufferedOutputStream here, takes 1 second vs. ~16 seconds. FilterOutputStream isn't buffered, is too slow for practical use?

{
@Override
public void close() throws IOException
{
// Do nothing.
}
};
}

public interface OutputStreamConsumer
{
void accept(OutputStream outputStream) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;

public class FileUtilsTest
{
Expand All @@ -48,4 +49,29 @@ public void testMap() throws IOException
long buffersMemoryAfter = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers();
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
}

@Test
public void testWriteAtomically() throws IOException
{
final File tmpDir = folder.newFolder();
final File tmpFile = new File(tmpDir, "file1");
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo")));
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));

// Try writing again, throw error partway through.
try {
FileUtils.writeAtomically(tmpFile, out -> {
out.write(StringUtils.toUtf8("bar"));
out.flush();
throw new ISE("OMG!");
});
}
catch (IllegalStateException e) {
// Suppress
}
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));

FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz")));
Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;

Expand All @@ -50,7 +51,10 @@ public LookupSnapshotTaker(
)
{
this.objectMapper = jsonMapper;
Preconditions.checkArgument(!Strings.isNullOrEmpty(persistDirectory), "can not work without specifying persistDirectory");
Preconditions.checkArgument(
!Strings.isNullOrEmpty(persistDirectory),
"can not work without specifying persistDirectory"
);
this.persistDirectory = new File(persistDirectory);
if (!this.persistDirectory.exists()) {
Preconditions.checkArgument(this.persistDirectory.mkdirs(), "Oups was not able to create persist directory");
Expand All @@ -72,7 +76,7 @@ public synchronized List<LookupBean> pullExistingSnapshot()
LOGGER.warn("found empty file no lookups to load from [%s]", persistFile.getAbsolutePath());
return Collections.emptyList();
}
lookupBeanList = objectMapper.readValue(persistFile, new TypeReference<List<LookupBean>>(){});
lookupBeanList = objectMapper.readValue(persistFile, new TypeReference<List<LookupBean>>() {});
return lookupBeanList;
}
catch (IOException e) {
Expand All @@ -83,7 +87,7 @@ public synchronized List<LookupBean> pullExistingSnapshot()
public synchronized void takeSnapshot(List<LookupBean> lookups)
{
try {
objectMapper.writeValue(persistFile, lookups);
FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups));
}
catch (IOException e) {
throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import java.io.File;
Expand All @@ -43,6 +44,10 @@ public class LookupSnapshotTakerTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Rule
public ExpectedException expectedException = ExpectedException.none();

private final ObjectMapper mapper = TestHelper.makeJsonMapper();


Expand Down Expand Up @@ -112,14 +117,15 @@ public void testBackwardCompatibility() throws IOException
Assert.assertEquals(lookupBeanList, actualList);
}

@Test(expected = ISE.class)
@Test
public void testIOExceptionDuringLookupPersist() throws IOException
{
File directory = temporaryFolder.newFolder();
File snapshotFile = new File(directory, LookupSnapshotTaker.PERSIST_FILE_NAME);
Assert.assertFalse(snapshotFile.exists());
Assert.assertTrue(snapshotFile.createNewFile());
Assert.assertTrue(snapshotFile.setReadOnly());
Assert.assertTrue(snapshotFile.getParentFile().setReadOnly());
LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath());
LookupBean lookupBean = new LookupBean(
"name",
Expand All @@ -135,10 +141,12 @@ public void testIOExceptionDuringLookupPersist() throws IOException
)
);
List<LookupBean> lookupBeanList = Lists.newArrayList(lookupBean);

expectedException.expect(ISE.class);
expectedException.expectMessage("Exception during serialization of lookups");
lookupSnapshotTaker.takeSnapshot(lookupBeanList);
}


@Test
public void tesLookupPullingFromEmptyFile() throws IOException
{
Expand Down