From fa8596913f61a8ccab2d8324040adcff5f141b42 Mon Sep 17 00:00:00 2001 From: akashdw Date: Fri, 12 Aug 2016 09:27:50 -0700 Subject: [PATCH 1/6] File smoosher changes. --- .../metamx/common/io/smoosh/FileSmoosher.java | 100 ++++++++++++++- .../io/smoosh/SmooshedFileMapperTest.java | 115 +++++++++++++++--- 2 files changed, 199 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 000d2ff8..85153e22 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -42,7 +42,9 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,8 +67,11 @@ public class FileSmoosher implements Closeable private final List outFiles = Lists.newArrayList(); private final Map internalFiles = Maps.newTreeMap(); + private List files = Lists.newArrayList(); + private List createFiles = Lists.newArrayList(); private Outer currOut = null; + private boolean writerCurrentlyInUse = false; public FileSmoosher( File baseDir @@ -148,6 +153,11 @@ public void add(String name, List bufferToAdd) throws IOException public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException { + if(writerCurrentlyInUse) + { + return delegateSmooshedWriter (name , size); + } + if (size > maxChunkSize) { throw new IAE("Asked to add buffers[%,d] larger than configured max[%,d]", size, maxChunkSize); } @@ -160,7 +170,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) } final int startOffset = currOut.getCurrOffset(); - + writerCurrentlyInUse = true; return new SmooshedWriter() { private boolean open = true; @@ -203,6 +213,7 @@ public void close() throws IOException { open = false; internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset())); + writerCurrentlyInUse = false; if (bytesWritten != currOut.getCurrOffset() - startOffset) { throw new ISE("WTF? Perhaps there is some concurrent modification going on?"); @@ -210,15 +221,100 @@ public void close() throws IOException if (bytesWritten != size) { throw new IOException( String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten) - ); + ); + } + + if(!writerCurrentlyInUse) + { + mergeWithSmoosher(); } } }; } + private void mergeWithSmoosher() throws IOException + { + //get processed elements from the stack and write. + List fileToProcess = new ArrayList<>(files); + files = Lists.newArrayList(); + for(File file: fileToProcess) + { + add(file); + createFiles.remove(file); + file.delete(); + } + } + + private SmooshedWriter delegateSmooshedWriter(final String name,final long size) throws IOException + { + final File tmpFile = new File(baseDir, name); + createFiles.add(tmpFile); + return new SmooshedWriter () { + private int currOffset = 0; + private boolean open = true; + private final FileOutputStream out = new FileOutputStream(tmpFile); + + @Override + public void close() throws IOException + { + open = false; + out.close(); + files.add(tmpFile); + if(!writerCurrentlyInUse ) { + mergeWithSmoosher(); + } + } + public int bytesLeft() + { + return (int) (size - currOffset); + } + + @Override + public int write(ByteBuffer buffer) throws IOException + { + FileChannel channel = out.getChannel(); + int bytesWritten = channel.write(buffer); + return addToOffset(bytesWritten); + } + + @Override + public int write(InputStream in) throws IOException + { + return addToOffset(Ints.checkedCast(java.nio.file.Files.copy(in, tmpFile.toPath()))); + } + + public int addToOffset(int numBytesWritten) + { + if (numBytesWritten > bytesLeft()) { + throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); + } + + currOffset += numBytesWritten; + + return numBytesWritten; + } + + @Override + public boolean isOpen() { + return open; + } + }; + + } + @Override public void close() throws IOException { + //book keeping checks on created file. + if(!createFiles.isEmpty()) + { + for(File file: createFiles) + { + file.delete(); + } + throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", createFiles.size())); + } + Closeables.close(currOut, false); File metaFile = metaFile(baseDir); diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index e01cfa59..90d5fea1 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -46,24 +47,88 @@ public void testSanity() throws Exception } smoosher.close(); - File[] files = baseDir.listFiles(); - Arrays.sort(files); + validateOutput(baseDir); + } + finally { + for (File file : baseDir.listFiles()) { + file.delete(); + } + } + } - Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file - for (int i = 0; i < 4; ++i) { - Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); + @Test + public void testWhenFirstWriterClosedInTheMiddle() throws Exception + { + File baseDir = Files.createTempDir(); + + try { + FileSmoosher smoosher = new FileSmoosher(baseDir, 21); + + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); + + for (int i = 0; i < 19; ++i) { + File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); + Files.write(Ints.toByteArray(i), tmpFile); + smoosher.add(String.format("%d", i), tmpFile); + if(i==10) + { + writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); + CloseQuietly.close(writer); + } + tmpFile.delete(); + } + smoosher.close(); + validateOutput(baseDir); + } + finally { + for (File file : baseDir.listFiles()) { + file.delete(); } - Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); + } + } - SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir); - for (int i = 0; i < 20; ++i) { - ByteBuffer buf = mapper.mapFile(String.format("%d", i)); - Assert.assertEquals(0, buf.position()); - Assert.assertEquals(4, buf.remaining()); - Assert.assertEquals(4, buf.capacity()); - Assert.assertEquals(i, buf.getInt()); + @Test(expected= ISE.class) + public void testExceptionForUnClosedFiles() throws Exception + { + File baseDir = Files.createTempDir(); + + try { + FileSmoosher smoosher = new FileSmoosher(baseDir, 21); + + for (int i = 0; i < 19; ++i) { + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); + writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); } - mapper.close(); + smoosher.close(); + } + finally { + for (File file : baseDir.listFiles()) { + file.delete(); + } + } + } + + @Test + public void testWhenFirstWriterClosedInTheLast() throws Exception + { + File baseDir = Files.createTempDir(); + + try { + FileSmoosher smoosher = new FileSmoosher(baseDir, 21); + + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); + writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); + + for (int i = 0; i < 19; ++i) { + File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); + Files.write(Ints.toByteArray(i), tmpFile); + smoosher.add(String.format("%d", i), tmpFile); + tmpFile.delete(); + } + CloseQuietly.close(writer); + smoosher.close(); + + validateOutput(baseDir); } finally { for (File file : baseDir.listFiles()) { @@ -140,4 +205,26 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception } } } + + private void validateOutput(File baseDir) throws IOException + { + File[] files = baseDir.listFiles(); + Arrays.sort(files); + + Assert.assertEquals(5, files.length); // 4 smoosh files and 1 meta file + for (int i = 0; i < 4; ++i) { + Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); + } + Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); + + SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir); + for (int i = 0; i < 20; ++i) { + ByteBuffer buf = mapper.mapFile(String.format("%d", i)); + Assert.assertEquals(0, buf.position()); + Assert.assertEquals(4, buf.remaining()); + Assert.assertEquals(4, buf.capacity()); + Assert.assertEquals(i, buf.getInt()); + } + mapper.close(); + } } From 43aec86b19da0cdccfe93112151aca3ae01d1c54 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 30 Sep 2016 11:02:36 -0700 Subject: [PATCH 2/6] addressed comment --- .../metamx/common/io/smoosh/FileSmoosher.java | 65 +++++---- .../io/smoosh/SmooshedFileMapperTest.java | 134 +++++++++--------- 2 files changed, 99 insertions(+), 100 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 669d600c..03398091 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -66,15 +66,15 @@ public class FileSmoosher implements Closeable private final List outFiles = Lists.newArrayList(); private final Map internalFiles = Maps.newTreeMap(); - private List files = Lists.newArrayList(); - private List createFiles = Lists.newArrayList(); + private List completedFiles = Lists.newArrayList(); + private List filesInProcess = Lists.newArrayList(); private Outer currOut = null; private boolean writerCurrentlyInUse = false; public FileSmoosher( File baseDir - ) + ) { this(baseDir, Integer.MAX_VALUE); } @@ -82,7 +82,7 @@ public FileSmoosher( public FileSmoosher( File baseDir, int maxChunkSize - ) + ) { this.baseDir = baseDir; this.maxChunkSize = maxChunkSize; @@ -204,13 +204,10 @@ public void close() throws IOException if (bytesWritten != size) { throw new IOException( String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten) - ); + ); } - if(!writerCurrentlyInUse) - { - mergeWithSmoosher(); - } + mergeWithSmoosher(); } }; } @@ -218,12 +215,11 @@ public void close() throws IOException private void mergeWithSmoosher() throws IOException { //get processed elements from the stack and write. - List fileToProcess = new ArrayList<>(files); - files = Lists.newArrayList(); + List fileToProcess = new ArrayList<>(completedFiles); + completedFiles = Lists.newArrayList(); for(File file: fileToProcess) { add(file); - createFiles.remove(file); file.delete(); } } @@ -231,7 +227,7 @@ private void mergeWithSmoosher() throws IOException private SmooshedWriter delegateSmooshedWriter(final String name,final long size) throws IOException { final File tmpFile = new File(baseDir, name); - createFiles.add(tmpFile); + filesInProcess.add(tmpFile); return new SmooshedWriter () { private int currOffset = 0; private boolean open = true; @@ -242,7 +238,9 @@ public void close() throws IOException { open = false; out.close(); - files.add(tmpFile); + completedFiles.add(tmpFile); + filesInProcess.remove(tmpFile); + if(!writerCurrentlyInUse ) { mergeWithSmoosher(); } @@ -255,26 +253,26 @@ public int bytesLeft() @Override public int write(ByteBuffer buffer) throws IOException { - FileChannel channel = out.getChannel(); + WritableByteChannel channel = Channels.newChannel(out); int bytesWritten = channel.write(buffer); - return addToOffset(bytesWritten); + addToOffset(bytesWritten); + return bytesWritten; } @Override public int write(InputStream in) throws IOException { - return addToOffset(Ints.checkedCast(java.nio.file.Files.copy(in, tmpFile.toPath()))); + int bytesWritten = Ints.checkedCast(ByteStreams.copy(in, out)); + addToOffset(bytesWritten); + return bytesWritten; } - public int addToOffset(int numBytesWritten) + private void addToOffset(int numBytesWritten) { if (numBytesWritten > bytesLeft()) { throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); } - currOffset += numBytesWritten; - - return numBytesWritten; } @Override @@ -289,15 +287,15 @@ public boolean isOpen() { public void close() throws IOException { //book keeping checks on created file. - if(!createFiles.isEmpty()) + if(!completedFiles.isEmpty()) { - for(File file: createFiles) + for(File file: completedFiles) { file.delete(); } - throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", createFiles.size())); + throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size())); } - + if (currOut != null) { currOut.close(); } @@ -316,8 +314,8 @@ public void close() throws IOException metadata.getFileNum(), metadata.getStartOffset(), metadata.getEndOffset() - ) - ); + ) + ); out.write("\n"); } } @@ -376,24 +374,25 @@ public int bytesLeft() public int write(ByteBuffer buffer) throws IOException { WritableByteChannel channel = Channels.newChannel(out); - return addToOffset(channel.write(buffer)); + int bytesWritten = channel.write(buffer); + addToOffset(bytesWritten); + return bytesWritten; } @Override public int write(InputStream in) throws IOException { - return addToOffset(Ints.checkedCast(ByteStreams.copy(in, out))); + int bytesWritten = Ints.checkedCast(ByteStreams.copy(in, out)); + addToOffset(bytesWritten); + return bytesWritten; } - public int addToOffset(int numBytesWritten) + private void addToOffset(int numBytesWritten) { if (numBytesWritten > bytesLeft()) { throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft()); } - currOffset += numBytesWritten; - - return numBytesWritten; } @Override diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index 103fbb6f..c737759a 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -58,62 +58,62 @@ public void testSanity() throws Exception @Test public void testWhenFirstWriterClosedInTheMiddle() throws Exception { - File baseDir = Files.createTempDir(); - - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { - final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); - - for (int i = 0; i < 19; ++i) { - File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); - Files.write(Ints.toByteArray(i), tmpFile); - smoosher.add(String.format("%d", i), tmpFile); - if(i==10) - { - writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); - CloseQuietly.close(writer); - } - tmpFile.delete(); - } - } - validateOutput(baseDir); + File baseDir = Files.createTempDir(); + + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) + { + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); + + for (int i = 0; i < 19; ++i) { + File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); + Files.write(Ints.toByteArray(i), tmpFile); + smoosher.add(String.format("%d", i), tmpFile); + if(i==10) + { + writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); + CloseQuietly.close(writer); + } + tmpFile.delete(); + } + } + validateOutput(baseDir); } - + @Test(expected= ISE.class) public void testExceptionForUnClosedFiles() throws Exception { - File baseDir = Files.createTempDir(); - - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { - for (int i = 0; i < 19; ++i) { - final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); - writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); - } - smoosher.close(); - } + File baseDir = Files.createTempDir(); + + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) + { + for (int i = 0; i < 19; ++i) { + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); + writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); + } + smoosher.close(); + } } @Test - public void testWhenFirstWriterClosedInTheLast() throws Exception + public void testWhenFirstWriterClosedAtTheEnd() throws Exception { - File baseDir = Files.createTempDir(); - - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { - final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); - writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); - - for (int i = 0; i < 19; ++i) { - File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); - Files.write(Ints.toByteArray(i), tmpFile); - smoosher.add(String.format("%d", i), tmpFile); - tmpFile.delete(); - } - CloseQuietly.close(writer); - smoosher.close(); - } - validateOutput(baseDir); + File baseDir = Files.createTempDir(); + + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) + { + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); + writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); + + for (int i = 0; i < 19; ++i) { + File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); + Files.write(Ints.toByteArray(i), tmpFile); + smoosher.add(String.format("%d", i), tmpFile); + tmpFile.delete(); + } + CloseQuietly.close(writer); + smoosher.close(); + } + validateOutput(baseDir); } @Test @@ -193,26 +193,26 @@ public void testDeterministicFileUnmapping() throws IOException // Assert no hanging file mappings left by either smoosher or smoosher.add(file) Assert.assertEquals(totalMemoryUsedBeforeAddingFile, totalMemoryUsedAfterAddingFile); } - + private void validateOutput(File baseDir) throws IOException { - File[] files = baseDir.listFiles(); - Arrays.sort(files); - - Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file - for (int i = 0; i < 4; ++i) { - Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); - } - Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); - - try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) { - for (int i = 0; i < 20; ++i) { - ByteBuffer buf = mapper.mapFile(String.format("%d", i)); - Assert.assertEquals(0, buf.position()); - Assert.assertEquals(4, buf.remaining()); - Assert.assertEquals(4, buf.capacity()); - Assert.assertEquals(i, buf.getInt()); - } - } + File[] files = baseDir.listFiles(); + Arrays.sort(files); + + Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file + for (int i = 0; i < 4; ++i) { + Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]); + } + Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]); + + try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) { + for (int i = 0; i < 20; ++i) { + ByteBuffer buf = mapper.mapFile(String.format("%d", i)); + Assert.assertEquals(0, buf.position()); + Assert.assertEquals(4, buf.remaining()); + Assert.assertEquals(4, buf.capacity()); + Assert.assertEquals(i, buf.getInt()); + } + } } } \ No newline at end of file From faf2aa48b8be5d2d67452d451dd7ac1c8b48afdf Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 30 Sep 2016 11:13:02 -0700 Subject: [PATCH 3/6] formatting. --- .../java/com/metamx/common/io/smoosh/FileSmoosher.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 03398091..039c26af 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -74,7 +74,7 @@ public class FileSmoosher implements Closeable public FileSmoosher( File baseDir - ) + ) { this(baseDir, Integer.MAX_VALUE); } @@ -82,7 +82,7 @@ public FileSmoosher( public FileSmoosher( File baseDir, int maxChunkSize - ) + ) { this.baseDir = baseDir; this.maxChunkSize = maxChunkSize; @@ -206,7 +206,6 @@ public void close() throws IOException String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten) ); } - mergeWithSmoosher(); } }; @@ -314,8 +313,8 @@ public void close() throws IOException metadata.getFileNum(), metadata.getStartOffset(), metadata.getEndOffset() - ) - ); + ) + ); out.write("\n"); } } From 034a3746f2e34aab634ba765ca938f1d3c93fed1 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 30 Sep 2016 11:15:13 -0700 Subject: [PATCH 4/6] formatting. --- src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 039c26af..0cf9de37 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -138,7 +138,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) { if(writerCurrentlyInUse) { - return delegateSmooshedWriter (name , size); + return delegateSmooshedWriter(name , size); } if (size > maxChunkSize) { From b060cd6917cbd28ac7516ab0150cb4e06d025e7c Mon Sep 17 00:00:00 2001 From: dwivedi Date: Mon, 3 Oct 2016 11:36:13 -0700 Subject: [PATCH 5/6] addressed comment and formatting. --- .../metamx/common/io/smoosh/FileSmoosher.java | 32 ++++++++++++------- .../io/smoosh/SmooshedFileMapperTest.java | 2 +- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 0cf9de37..d845fa5b 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -136,14 +136,16 @@ public void add(String name, List bufferToAdd) throws IOException public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException { - if(writerCurrentlyInUse) - { - return delegateSmooshedWriter(name , size); - } if (size > maxChunkSize) { throw new IAE("Asked to add buffers[%,d] larger than configured max[%,d]", size, maxChunkSize); } + + if (writerCurrentlyInUse) + { + return delegateSmooshedWriter(name, size); + } + if (currOut == null) { currOut = getNewCurrOut(); } @@ -216,18 +218,20 @@ private void mergeWithSmoosher() throws IOException //get processed elements from the stack and write. List fileToProcess = new ArrayList<>(completedFiles); completedFiles = Lists.newArrayList(); - for(File file: fileToProcess) + for (File file: fileToProcess) { add(file); file.delete(); } } - private SmooshedWriter delegateSmooshedWriter(final String name,final long size) throws IOException - { + private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException + { final File tmpFile = new File(baseDir, name); filesInProcess.add(tmpFile); - return new SmooshedWriter () { + + return new SmooshedWriter() + { private int currOffset = 0; private boolean open = true; private final FileOutputStream out = new FileOutputStream(tmpFile); @@ -240,7 +244,7 @@ public void close() throws IOException completedFiles.add(tmpFile); filesInProcess.remove(tmpFile); - if(!writerCurrentlyInUse ) { + if (!writerCurrentlyInUse) { mergeWithSmoosher(); } } @@ -286,13 +290,17 @@ public boolean isOpen() { public void close() throws IOException { //book keeping checks on created file. - if(!completedFiles.isEmpty()) + if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) { - for(File file: completedFiles) + for (File file: completedFiles) + { + file.delete(); + } + for (File file: filesInProcess) { file.delete(); } - throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size())); + throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size())); } if (currOut != null) { diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java index c737759a..5e149e16 100644 --- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java +++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java @@ -68,7 +68,7 @@ public void testWhenFirstWriterClosedInTheMiddle() throws Exception File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); Files.write(Ints.toByteArray(i), tmpFile); smoosher.add(String.format("%d", i), tmpFile); - if(i==10) + if (i==10) { writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); CloseQuietly.close(writer); From 72d6f2185896cb4613eb77801ab8fc4c810c6cfe Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 7 Oct 2016 17:21:09 -0700 Subject: [PATCH 6/6] Fixed comments. --- .../com/metamx/common/io/smoosh/FileSmoosher.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java index 8d2c4ec7..0f5159d7 100644 --- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java +++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java @@ -149,7 +149,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throw new IAE("Asked to add buffers[%,d] larger than configured max[%,d]", size, maxChunkSize); } - // if current writer is in use then create a new SmooshedWriter which + // If current writer is in use then create a new SmooshedWriter which // writes into temporary file which is later merged into original // FileSmoosher. if (writerCurrentlyInUse) @@ -231,22 +231,21 @@ public void close() throws IOException String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten) ); } - // check if delegated smooshedWriter any file, merge with this - // FileSmoosher. + // Merge temporary files on to the main smoosh file. mergeWithSmoosher(); } }; } /** - * merges temporary files created by delegated SmooshedWriters into original - * FileSmoosher. + * Merges temporary files created by delegated SmooshedWriters on to the main + * smoosh file. * * @throws IOException */ private void mergeWithSmoosher() throws IOException { - //get processed elements from the stack and write. + // Get processed elements from the stack and write. List fileToProcess = new ArrayList<>(completedFiles); completedFiles = Lists.newArrayList(); for (File file: fileToProcess)