From 52ec8d3058fe2223d010174d59b9192615c2cbcd Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 7 Apr 2016 15:19:12 +0900 Subject: [PATCH 1/6] Use file channel in smoosher to minimize copy overheads --- .../metamx/common/io/smoosh/FileSmoosher.java | 21 +++++++------------ .../io/smoosh/SmooshedFileMapperTest.java | 2 +- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 68d89229..a88b6f59 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -28,7 +28,6 @@ import com.metamx.common.ISE; import com.metamx.common.MappedByteBufferHandler; -import java.io.BufferedOutputStream; import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; @@ -36,12 +35,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -232,7 +230,7 @@ private Outer getNewCurrOut() throws FileNotFoundException final int fileNum = outFiles.size(); File outFile = makeChunkFile(baseDir, fileNum); outFiles.add(outFile); - return new Outer(fileNum, new BufferedOutputStream(new FileOutputStream(outFile)), maxChunkSize); + return new Outer(fileNum, new FileOutputStream(outFile).getChannel(), maxChunkSize); } static File metaFile(File baseDir) @@ -248,16 +246,15 @@ static File makeChunkFile(File baseDir, int i) public static class Outer implements SmooshedWriter { private final int fileNum; - private final OutputStream out; + private final FileChannel channel; private final int maxLength; - private boolean open = true; private int currOffset = 0; - Outer(int fileNum, OutputStream out, int maxLength) + Outer(int fileNum, FileChannel channel, int maxLength) { this.fileNum = fileNum; - this.out = out; + this.channel = channel; this.maxLength = maxLength; } @@ -279,14 +276,13 @@ public int bytesLeft() @Override public int write(ByteBuffer buffer) throws IOException { - WritableByteChannel channel = Channels.newChannel(out); return addToOffset(channel.write(buffer)); } @Override public int write(InputStream in) throws IOException { - return addToOffset(Ints.checkedCast(ByteStreams.copy(in, out))); + return addToOffset(Ints.checkedCast(ByteStreams.copy(Channels.newChannel(in), channel))); } public int addToOffset(int numBytesWritten) @@ -303,14 +299,13 @@ public int addToOffset(int numBytesWritten) @Override public boolean isOpen() { - return open; + return channel.isOpen(); } @Override public void close() throws IOException { - open = false; - out.close(); + channel.close(); } } } diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index 482429cc..3d2fac41 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -127,7 +127,7 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception Assert.assertTrue(exceptionThrown); File[] files = baseDir.listFiles(); Assert.assertEquals(1, files.length); - Assert.assertEquals(0, files[0].length()); + Assert.assertEquals(4, files[0].length()); } } From 92684fb4978950908d73a097001d3287a42ba6ff Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 8 Apr 2016 08:45:59 +0900 Subject: [PATCH 2/6] added buffered writer --- .../metamx/common/io/smoosh/FileSmoosher.java | 71 ++++++++++++++++++- .../io/smoosh/SmooshedFileMapperTest.java | 2 +- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index a88b6f59..0a1594e0 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -40,6 +40,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -246,7 +247,7 @@ static File makeChunkFile(File baseDir, int i) public static class Outer implements SmooshedWriter { private final int fileNum; - private final FileChannel channel; + private final WritableByteChannel channel; private final int maxLength; private int currOffset = 0; @@ -254,7 +255,7 @@ public static class Outer implements SmooshedWriter Outer(int fileNum, FileChannel channel, int maxLength) { this.fileNum = fileNum; - this.channel = channel; + this.channel = new BufferedWritableByteChannel(channel); this.maxLength = maxLength; } @@ -308,4 +309,70 @@ public void close() throws IOException channel.close(); } } + + private static class BufferedWritableByteChannel + implements WritableByteChannel + { + private static final int MAX_BUFFER_SIZE = 65536; + private static final int DEFAULT_BUFFER_SIZE = 8192; // default of buffered output stream + + private final WritableByteChannel channel; + private final ByteBuffer buffer; + + BufferedWritableByteChannel(WritableByteChannel channel) + { + this(channel, DEFAULT_BUFFER_SIZE); + } + + BufferedWritableByteChannel(WritableByteChannel channel, int buffer) + { + this.channel = channel; + this.buffer = ByteBuffer.allocateDirect(Math.min(buffer, MAX_BUFFER_SIZE)); + } + + public int write(ByteBuffer src) throws IOException + { + int position = src.position(); + while (src.hasRemaining()) { + ByteBuffer toWrite = src; + int bytesToWrite = Math.min(buffer.remaining(), src.remaining()); + if (bytesToWrite != src.remaining()) { + toWrite = src.duplicate(); + toWrite.limit(toWrite.position() + bytesToWrite); + } + buffer.put(toWrite); + src.position(toWrite.position()); + if (!buffer.hasRemaining()) { + flush(); + } + } + return src.position() - position; + } + + private void flush() throws IOException + { + buffer.flip(); + channel.write(buffer); + if (buffer.hasRemaining()) { + buffer.compact(); + } else { + buffer.clear(); + } + } + + @Override + public boolean isOpen() + { + return channel.isOpen(); + } + + @Override + public void close() throws IOException + { + if (buffer.position() != 0) { + flush(); + } + channel.close(); + } + } } diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index 3d2fac41..482429cc 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -127,7 +127,7 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception Assert.assertTrue(exceptionThrown); File[] files = baseDir.listFiles(); Assert.assertEquals(1, files.length); - Assert.assertEquals(4, files[0].length()); + Assert.assertEquals(0, files[0].length()); } } From 893e6f2a3fcac9f9ed30cd0405b5cccac50fbc5e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 8 Apr 2016 16:23:54 +0900 Subject: [PATCH 3/6] support gather-write --- .../metamx/common/io/smoosh/FileSmoosher.java | 55 ++++++++++++++++--- .../common/io/smoosh/SmooshedWriter.java | 4 +- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 0a1594e0..11cd716c 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -40,6 +40,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.List; @@ -160,7 +161,19 @@ public int write(ByteBuffer in) throws IOException return verifySize(currOut.write(in)); } - private int verifySize(int bytesWrittenInChunk) throws IOException + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return verifySize(currOut.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return verifySize(currOut.write(srcs)); + } + + private int verifySize(long bytesWrittenInChunk) throws IOException { bytesWritten += bytesWrittenInChunk; @@ -171,7 +184,7 @@ private int verifySize(int bytesWrittenInChunk) throws IOException throw new ISE("Wrote[%,d] bytes for something of size[%,d]. Liar!!!", bytesWritten, size); } - return bytesWrittenInChunk; + return Ints.checkedCast(bytesWrittenInChunk); } @Override @@ -247,8 +260,8 @@ static File makeChunkFile(File baseDir, int i) public static class Outer implements SmooshedWriter { private final int fileNum; - private final WritableByteChannel channel; private final int maxLength; + private final GatheringByteChannel channel; private int currOffset = 0; @@ -283,10 +296,22 @@ public int write(ByteBuffer buffer) throws IOException @Override public int write(InputStream in) throws IOException { - return addToOffset(Ints.checkedCast(ByteStreams.copy(Channels.newChannel(in), channel))); + return addToOffset(ByteStreams.copy(Channels.newChannel(in), channel)); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return addToOffset(channel.write(srcs, offset, length)); } - public int addToOffset(int numBytesWritten) + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return addToOffset(channel.write(srcs)); + } + + public int addToOffset(long numBytesWritten) { if (numBytesWritten > bytesLeft()) { throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); @@ -294,7 +319,7 @@ public int addToOffset(int numBytesWritten) currOffset += numBytesWritten; - return numBytesWritten; + return Ints.checkedCast(numBytesWritten); } @Override @@ -311,7 +336,7 @@ public void close() throws IOException } private static class BufferedWritableByteChannel - implements WritableByteChannel + implements GatheringByteChannel { private static final int MAX_BUFFER_SIZE = 65536; private static final int DEFAULT_BUFFER_SIZE = 8192; // default of buffered output stream @@ -349,6 +374,22 @@ public int write(ByteBuffer src) throws IOException return src.position() - position; } + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + long written = 0; + for (int i = offset; i < length; i++) { + written += write(srcs[i]); + } + return written; + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return write(srcs, 0, srcs.length); + } + private void flush() throws IOException { buffer.flip(); diff --git a/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java b/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java index d0c8d84d..a5bf9ed0 100644 --- a/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java +++ b/src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java @@ -19,11 +19,11 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.nio.channels.WritableByteChannel; +import java.nio.channels.GatheringByteChannel; /** */ -public interface SmooshedWriter extends Closeable, WritableByteChannel +public interface SmooshedWriter extends Closeable, GatheringByteChannel { public int write(InputStream in) throws IOException; } From 6a70df1976e8a1fe119618544bc806444cab8bd5 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sat, 21 May 2016 14:02:09 +0900 Subject: [PATCH 4/6] addressed comment --- .../metamx/common/io/smoosh/FileSmoosher.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 11cd716c..f31dca03 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -368,7 +368,7 @@ public int write(ByteBuffer src) throws IOException buffer.put(toWrite); src.position(toWrite.position()); if (!buffer.hasRemaining()) { - flush(); + flush(false); } } return src.position() - position; @@ -390,10 +390,13 @@ public long write(ByteBuffer[] srcs) throws IOException return write(srcs, 0, srcs.length); } - private void flush() throws IOException + private void flush(boolean flushAll) throws IOException { buffer.flip(); - channel.write(buffer); + do { + channel.write(buffer); + } while (flushAll && buffer.hasRemaining()); + if (buffer.hasRemaining()) { buffer.compact(); } else { @@ -410,10 +413,14 @@ public boolean isOpen() @Override public void close() throws IOException { - if (buffer.position() != 0) { - flush(); + try { + if (buffer.position() != 0) { + flush(true); + } + } + finally { + channel.close(); } - channel.close(); } } } From f11aaf23bf4cbb6136f6e2927dd472a89bad44f9 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Mon, 26 Sep 2016 09:28:05 +0900 Subject: [PATCH 5/6] use TWR, as commented --- src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index f31dca03..30c50c36 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -413,14 +413,11 @@ public boolean isOpen() @Override public void close() throws IOException { - try { + try (Closeable toClose = channel) { if (buffer.position() != 0) { flush(true); } } - finally { - channel.close(); - } } } } From 76d5428cdaf5f3ca02413dd248abc178085841e1 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 5 Oct 2016 11:20:10 +0900 Subject: [PATCH 6/6] Addressed comments --- .../metamx/common/io/smoosh/FileSmoosher.java | 100 ++---------------- .../io/smoosh/SmooshedFileMapperTest.java | 4 +- 2 files changed, 11 insertions(+), 93 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 30c50c36..5ab1ffd0 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; +import com.google.common.io.Closer; import com.google.common.primitives.Ints; import com.metamx.common.FileUtils; import com.metamx.common.IAE; @@ -39,9 +40,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; -import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -244,7 +243,7 @@ private Outer getNewCurrOut() throws FileNotFoundException final int fileNum = outFiles.size(); File outFile = makeChunkFile(baseDir, fileNum); outFiles.add(outFile); - return new Outer(fileNum, new FileOutputStream(outFile).getChannel(), maxChunkSize); + return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize); } static File metaFile(File baseDir) @@ -263,13 +262,16 @@ public static class Outer implements SmooshedWriter private final int maxLength; private final GatheringByteChannel channel; + private final Closer closer = Closer.create(); private int currOffset = 0; - Outer(int fileNum, FileChannel channel, int maxLength) + Outer(int fileNum, FileOutputStream output, int maxLength) { this.fileNum = fileNum; - this.channel = new BufferedWritableByteChannel(channel); + this.channel = output.getChannel(); this.maxLength = maxLength; + closer.register(output); + closer.register(channel); } public int getFileNum() @@ -331,93 +333,7 @@ public boolean isOpen() @Override public void close() throws IOException { - channel.close(); - } - } - - private static class BufferedWritableByteChannel - implements GatheringByteChannel - { - private static final int MAX_BUFFER_SIZE = 65536; - private static final int DEFAULT_BUFFER_SIZE = 8192; // default of buffered output stream - - private final WritableByteChannel channel; - private final ByteBuffer buffer; - - BufferedWritableByteChannel(WritableByteChannel channel) - { - this(channel, DEFAULT_BUFFER_SIZE); - } - - BufferedWritableByteChannel(WritableByteChannel channel, int buffer) - { - this.channel = channel; - this.buffer = ByteBuffer.allocateDirect(Math.min(buffer, MAX_BUFFER_SIZE)); - } - - public int write(ByteBuffer src) throws IOException - { - int position = src.position(); - while (src.hasRemaining()) { - ByteBuffer toWrite = src; - int bytesToWrite = Math.min(buffer.remaining(), src.remaining()); - if (bytesToWrite != src.remaining()) { - toWrite = src.duplicate(); - toWrite.limit(toWrite.position() + bytesToWrite); - } - buffer.put(toWrite); - src.position(toWrite.position()); - if (!buffer.hasRemaining()) { - flush(false); - } - } - return src.position() - position; - } - - @Override - public long write(ByteBuffer[] srcs, int offset, int length) throws IOException - { - long written = 0; - for (int i = offset; i < length; i++) { - written += write(srcs[i]); - } - return written; - } - - @Override - public long write(ByteBuffer[] srcs) throws IOException - { - return write(srcs, 0, srcs.length); - } - - private void flush(boolean flushAll) throws IOException - { - buffer.flip(); - do { - channel.write(buffer); - } while (flushAll && buffer.hasRemaining()); - - if (buffer.hasRemaining()) { - buffer.compact(); - } else { - buffer.clear(); - } - } - - @Override - public boolean isOpen() - { - return channel.isOpen(); - } - - @Override - public void close() throws IOException - { - try (Closeable toClose = channel) { - if (buffer.position() != 0) { - flush(true); - } - } + closer.close(); } } } diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index 482429cc..74f87731 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -52,6 +52,7 @@ public void testSanity() throws Exception } File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Arrays.sort(files); Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file @@ -91,6 +92,7 @@ public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Excep } File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Arrays.sort(files); Assert.assertEquals(6, files.length); // 4 smoosh files and 1 meta file @@ -126,8 +128,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception Assert.assertTrue(exceptionThrown); File[] files = baseDir.listFiles(); + Assert.assertNotNull(files); Assert.assertEquals(1, files.length); - Assert.assertEquals(0, files[0].length()); } }