diff --git a/src/main/java/com/metamx/common/FileUtils.java b/src/main/java/com/metamx/common/FileUtils.java index f267ff9f..d4b29611 100644 --- a/src/main/java/com/metamx/common/FileUtils.java +++ b/src/main/java/com/metamx/common/FileUtils.java @@ -23,6 +23,10 @@ import com.google.common.io.Files; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.Collection; @@ -125,4 +129,33 @@ public void addFile(File file) this.addFiles(ImmutableList.of(file)); } } + + /** + * Fully maps a file read-only in to memory as per + * {@link FileChannel#map(java.nio.channels.FileChannel.MapMode, long, long)}. + * + *

Files are mapped from offset 0 to its length. + * + *

This only works for files <= {@link Integer#MAX_VALUE} bytes. + * + *

Similar to {@link Files#map(File)}, but returns {@link MappedByteBufferHandler}, that makes it easier to unmap + * the buffer within try-with-resources pattern: + *

{@code
+   * try (MappedByteBufferHandler fileMappingHandler = FileUtils.map(file)) {
+   *   ByteBuffer fileMapping = fileMappingHandler.get();
+   *   // use mapped buffer
+   * }}
+ * + * @param file the file to map + * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file} + * @throws FileNotFoundException if the {@code file} does not exist + * @throws IOException if an I/O error occurs + * + * @see FileChannel#map(FileChannel.MapMode, long, long) + */ + public static MappedByteBufferHandler map(File file) throws IOException + { + MappedByteBuffer mappedByteBuffer = Files.map(file); + return new MappedByteBufferHandler(mappedByteBuffer); + } } diff --git a/src/main/java/com/metamx/common/MappedByteBufferHandler.java b/src/main/java/com/metamx/common/MappedByteBufferHandler.java new file mode 100644 index 00000000..32f54dcb --- /dev/null +++ b/src/main/java/com/metamx/common/MappedByteBufferHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.metamx.common; + +import java.io.File; +import java.nio.MappedByteBuffer; + +/** + * Facilitates using try-with-resources with {@link MappedByteBuffer}s which don't implement {@link AutoCloseable}. + * + *

This interface is a specialization of {@code io.druid.collections.ResourceHandler}. + * @see FileUtils#map(File) + */ +public final class MappedByteBufferHandler implements AutoCloseable +{ + private final MappedByteBuffer mappedByteBuffer; + + MappedByteBufferHandler(MappedByteBuffer mappedByteBuffer) + { + this.mappedByteBuffer = mappedByteBuffer; + } + + /** + * Returns the wrapped buffer. + */ + public MappedByteBuffer get() + { + return mappedByteBuffer; + } + + /** + * Unmaps the wrapped buffer. + */ + @Override + public void close() + { + ByteBufferUtils.unmap(mappedByteBuffer); + } +} diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index d741bbb3..68d89229 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -22,11 +22,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; -import com.google.common.io.Files; import com.google.common.primitives.Ints; +import com.metamx.common.FileUtils; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.MappedByteBufferHandler; import java.io.BufferedOutputStream; import java.io.BufferedWriter; @@ -85,19 +85,6 @@ public FileSmoosher( Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); } - private FileSmoosher( - File baseDir, - int maxChunkSize, - List outFiles, - Map internalFiles - ) - { - this.baseDir = baseDir; - this.maxChunkSize = maxChunkSize; - this.outFiles.addAll(outFiles); - this.internalFiles.putAll(internalFiles); - } - public Set getInternalFilenames() { return internalFiles.keySet(); @@ -105,12 +92,14 @@ public Set getInternalFilenames() public void add(File fileToAdd) throws IOException { - add(fileToAdd.getName(), Files.map(fileToAdd)); + add(fileToAdd.getName(), fileToAdd); } public void add(String name, File fileToAdd) throws IOException { - add(name, Files.map(fileToAdd)); + try (MappedByteBufferHandler fileMappingHandler = FileUtils.map(fileToAdd)) { + add(name, fileMappingHandler.get()); + } } public void add(String name, ByteBuffer bufferToAdd) throws IOException @@ -149,7 +138,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) currOut = getNewCurrOut(); } if (currOut.bytesLeft() < size) { - Closeables.close(currOut, false); + currOut.close(); currOut = getNewCurrOut(); } @@ -213,13 +202,13 @@ public void close() throws IOException @Override public void close() throws IOException { - Closeables.close(currOut, false); + if (currOut != null) { + currOut.close(); + } File metaFile = metaFile(baseDir); - Writer out = null; - try { - out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), Charsets.UTF_8)); + try (Writer out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), Charsets.UTF_8))) { out.write(String.format("v1,%d,%d", maxChunkSize, outFiles.size())); out.write("\n"); @@ -236,9 +225,6 @@ public void close() throws IOException out.write("\n"); } } - finally { - Closeables.close(out, false); - } } private Outer getNewCurrOut() throws FileNotFoundException diff --git a/src/main/java/com/metamx/common/io/smoosh/SmooshedFileMapper.java b/src/main/java/com/metamx/common/io/smoosh/SmooshedFileMapper.java index 23292af9..ecba8517 100644 --- a/src/main/java/com/metamx/common/io/smoosh/SmooshedFileMapper.java +++ b/src/main/java/com/metamx/common/io/smoosh/SmooshedFileMapper.java @@ -17,6 +17,7 @@ package com.metamx.common.io.smoosh; import com.google.common.base.Charsets; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; @@ -131,10 +132,20 @@ public ByteBuffer mapFile(String name) throws IOException } @Override - public void close() throws IOException + public void close() { + Throwable thrown = null; for (MappedByteBuffer mappedByteBuffer : buffersList) { - ByteBufferUtils.unmap(mappedByteBuffer); + try { + ByteBufferUtils.unmap(mappedByteBuffer); + } catch (Throwable t) { + if (thrown == null) { + thrown = t; + } else { + thrown.addSuppressed(t); + } + } } + Throwables.propagateIfPossible(thrown); } } diff --git a/src/test/java/com/metamx/common/BufferUtils.java b/src/test/java/com/metamx/common/BufferUtils.java new file mode 100644 index 00000000..28f05083 --- /dev/null +++ b/src/test/java/com/metamx/common/BufferUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.metamx.common; + +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; + +public final class BufferUtils +{ + + public static long totalMemoryUsedByDirectAndMappedBuffers() + { + long totalMemoryUsed = 0L; + List pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + for (BufferPoolMXBean pool : pools) { + totalMemoryUsed += pool.getMemoryUsed(); + } + return totalMemoryUsed; + } + + private BufferUtils() {} +} diff --git a/src/test/java/com/metamx/common/FileUtilsTest.java b/src/test/java/com/metamx/common/FileUtilsTest.java new file mode 100644 index 00000000..c523fea1 --- /dev/null +++ b/src/test/java/com/metamx/common/FileUtilsTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2016 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.metamx.common; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +public class FileUtilsTest +{ + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testMap() throws IOException + { + File dataFile = folder.newFile("data"); + long buffersMemoryBefore = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) { + raf.write(42); + raf.setLength(1 << 20); // 1 MB + } + try (MappedByteBufferHandler mappedByteBufferHandler = FileUtils.map(dataFile)) { + Assert.assertEquals(42, mappedByteBufferHandler.get().get(0)); + } + long buffersMemoryAfter = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); + Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter); + } +} diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index e01cfa59..482429cc 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -18,12 +18,16 @@ import com.google.common.io.Files; import com.google.common.primitives.Ints; +import com.metamx.common.BufferUtils; import com.metamx.common.ISE; -import com.metamx.common.guava.CloseQuietly; import junit.framework.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.Arrays; @@ -31,31 +35,32 @@ */ public class SmooshedFileMapperTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + @Test public void testSanity() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try { - FileSmoosher smoosher = new FileSmoosher(baseDir, 21); + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { for (int i = 0; i < 20; ++i) { - File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); + File tmpFile = folder.newFile(String.format("smoosh-%s.bin", i)); Files.write(Ints.toByteArray(i), tmpFile); smoosher.add(String.format("%d", i), tmpFile); - tmpFile.delete(); } - smoosher.close(); + } - File[] files = baseDir.listFiles(); - Arrays.sort(files); + File[] files = baseDir.listFiles(); + Arrays.sort(files); - Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file - for (int i = 0; i < 4; ++i) { - Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); - } - Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); + Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file + for (int i = 0; i < 4; ++i) { + Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); + } + Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); - SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir); + try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) { for (int i = 0; i < 20; ++i) { ByteBuffer buf = mapper.mapFile(String.format("%d", i)); Assert.assertEquals(0, buf.position()); @@ -63,39 +68,38 @@ public void testSanity() throws Exception Assert.assertEquals(4, buf.capacity()); Assert.assertEquals(i, buf.getInt()); } - mapper.close(); - } - finally { - for (File file : baseDir.listFiles()) { - file.delete(); - } } } @Test public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try { - FileSmoosher smoosher = new FileSmoosher(baseDir, 21); + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { for (int i = 0; i < 20; ++i) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 7); writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); - CloseQuietly.close(writer); + try { + writer.close(); + Assert.fail("IOException expected"); + } + catch (IOException ignored) { + // expected + } } - smoosher.close(); + } - File[] files = baseDir.listFiles(); - Arrays.sort(files); + File[] files = baseDir.listFiles(); + Arrays.sort(files); - Assert.assertEquals(6, files.length); // 4 smoosh files and 1 meta file - for (int i = 0; i < 4; ++i) { - Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); - } - Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); + Assert.assertEquals(6, files.length); // 4 smoosh files and 1 meta file + for (int i = 0; i < 4; ++i) { + Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); + } + Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); - SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir); + try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) { for (int i = 0; i < 20; ++i) { ByteBuffer buf = mapper.mapFile(String.format("%d", i)); Assert.assertEquals(0, buf.position()); @@ -103,28 +107,19 @@ public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Excep Assert.assertEquals(4, buf.capacity()); Assert.assertEquals(i, buf.getInt()); } - mapper.close(); - } - finally { - for (File file : baseDir.listFiles()) { - file.delete(); - } } } @Test public void testBehaviorWhenReportedSizesSmall() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try { - FileSmoosher smoosher = new FileSmoosher(baseDir, 21); - final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2); + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { boolean exceptionThrown = false; - try { + try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) { writer.write(ByteBuffer.wrap(Ints.toByteArray(1))); - } - catch (ISE e) { + } catch (ISE e) { Assert.assertTrue(e.getMessage().contains("Liar!!!")); exceptionThrown = true; } @@ -134,10 +129,23 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception Assert.assertEquals(1, files.length); Assert.assertEquals(0, files[0].length()); } - finally { - for (File file : baseDir.listFiles()) { - file.delete(); + } + + @Test + public void testDeterministicFileUnmapping() throws IOException + { + File baseDir = folder.newFolder("base"); + + long totalMemoryUsedBeforeAddingFile = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); + try (FileSmoosher smoosher = new FileSmoosher(baseDir)) { + File dataFile = folder.newFile("data.bin"); + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) { + raf.setLength(1 << 20); // 1 MB } + smoosher.add(dataFile); } + long totalMemoryUsedAfterAddingFile = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); + // Assert no hanging file mappings left by either smoosher or smoosher.add(file) + Assert.assertEquals(totalMemoryUsedBeforeAddingFile, totalMemoryUsedAfterAddingFile); } }