diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 68d89229..5ab1ffd0 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -22,13 +22,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; +import com.google.common.io.Closer; import com.google.common.primitives.Ints; import com.metamx.common.FileUtils; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.MappedByteBufferHandler; -import java.io.BufferedOutputStream; import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; @@ -36,12 +36,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.nio.channels.GatheringByteChannel; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -161,7 +160,19 @@ public int write(ByteBuffer in) throws IOException return verifySize(currOut.write(in)); } - private int verifySize(int bytesWrittenInChunk) throws IOException + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return verifySize(currOut.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return verifySize(currOut.write(srcs)); + } + + private int verifySize(long bytesWrittenInChunk) throws IOException { bytesWritten += bytesWrittenInChunk; @@ -172,7 +183,7 @@ private int verifySize(int bytesWrittenInChunk) throws IOException throw new ISE("Wrote[%,d] bytes for something of size[%,d]. Liar!!!", bytesWritten, size); } - return bytesWrittenInChunk; + return Ints.checkedCast(bytesWrittenInChunk); } @Override @@ -232,7 +243,7 @@ private Outer getNewCurrOut() throws FileNotFoundException final int fileNum = outFiles.size(); File outFile = makeChunkFile(baseDir, fileNum); outFiles.add(outFile); - return new Outer(fileNum, new BufferedOutputStream(new FileOutputStream(outFile)), maxChunkSize); + return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize); } static File metaFile(File baseDir) @@ -248,17 +259,19 @@ static File makeChunkFile(File baseDir, int i) public static class Outer implements SmooshedWriter { private final int fileNum; - private final OutputStream out; private final int maxLength; + private final GatheringByteChannel channel; - private boolean open = true; + private final Closer closer = Closer.create(); private int currOffset = 0; - Outer(int fileNum, OutputStream out, int maxLength) + Outer(int fileNum, FileOutputStream output, int maxLength) { this.fileNum = fileNum; - this.out = out; + this.channel = output.getChannel(); this.maxLength = maxLength; + closer.register(output); + closer.register(channel); } public int getFileNum() @@ -279,17 +292,28 @@ public int bytesLeft() @Override public int write(ByteBuffer buffer) throws IOException { - WritableByteChannel channel = Channels.newChannel(out); return addToOffset(channel.write(buffer)); } @Override public int write(InputStream in) throws IOException { - return addToOffset(Ints.checkedCast(ByteStreams.copy(in, out))); + return addToOffset(ByteStreams.copy(Channels.newChannel(in), channel)); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return addToOffset(channel.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return addToOffset(channel.write(srcs)); } - public int addToOffset(int numBytesWritten) + public int addToOffset(long numBytesWritten) { if (numBytesWritten > bytesLeft()) { throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); @@ -297,20 +321,19 @@ public int addToOffset(int numBytesWritten) currOffset += numBytesWritten; - return numBytesWritten; + return Ints.checkedCast(numBytesWritten); } @Override public boolean isOpen() { - return open; + return channel.isOpen(); } @Override public void close() throws IOException { - open = false; - out.close(); + closer.close(); } } } diff --git a/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java b/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java index d0c8d84d..a5bf9ed0 100644 --- a/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java +++ b/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java @@ -19,11 +19,11 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.nio.channels.WritableByteChannel; +import java.nio.channels.GatheringByteChannel; /** */ -public interface SmooshedWriter extends Closeable, WritableByteChannel +public interface SmooshedWriter extends Closeable, GatheringByteChannel { public int write(InputStream in) throws IOException; } diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index 482429cc..74f87731 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -52,6 +52,7 @@ public void testSanity() throws Exception } File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Arrays.sort(files); Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file @@ -91,6 +92,7 @@ public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Excep } File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Arrays.sort(files); Assert.assertEquals(6, files.length); // 4 smoosh files and 1 meta file @@ -126,8 +128,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception Assert.assertTrue(exceptionThrown); File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Assert.assertEquals(1, files.length); - Assert.assertEquals(0, files[0].length()); } }