From 96a98f13214617ea3b908c3fcd46cbb37d0d74a0 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Wed, 9 Nov 2016 10:16:46 -0800 Subject: [PATCH 1/2] FileSmoosher requested changes from https://github.com/metamx/java-util/pull/55 --- .../util/common/io/smoosh/FileSmoosher.java | 68 +++++++++++-------- .../io/smoosh/SmooshedFileMapperTest.java | 37 ++++------ 2 files changed, 55 insertions(+), 50 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java index 753bddda1ea7..0b9b8adf9d9c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -31,6 +31,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.MappedByteBufferHandler; +import io.druid.java.util.common.logger.Logger; import java.io.BufferedWriter; import java.io.Closeable; @@ -59,7 +60,9 @@ * various "chunk" files will be varying sizes and it is not possible to add a * file of size greater than Integer.MAX_VALUE. *

- * This class is not thread safe but allows writing multiple files even if main + * This class is not thread safe. + *

+ * This class allows writing multiple files even if main * smoosh file writer is open. If main smoosh file writer is already open, it * delegates the write into temporary file on the file system which is later * copied on to the main smoosh file and underlying temporary file will be @@ -69,6 +72,7 @@ public class FileSmoosher implements Closeable { private static final String FILE_EXTENSION = "smoosh"; private static final Joiner joiner = Joiner.on(","); + private static final Logger LOG = new Logger(FileSmoosher.class); private final File baseDir; private final int maxChunkSize; @@ -101,6 +105,16 @@ public FileSmoosher( Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); } + static File metaFile(File baseDir) + { + return new File(baseDir, String.format("meta.%s", FILE_EXTENSION)); + } + + static File makeChunkFile(File baseDir, int i) + { + return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION)); + } + public Set getInternalFilenames() { return internalFiles.keySet(); @@ -155,8 +169,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) // If current writer is in use then create a new SmooshedWriter which // writes into temporary file which is later merged into original // FileSmoosher. - if (writerCurrentlyInUse) - { + if (writerCurrentlyInUse) { return delegateSmooshedWriter(name, size); } @@ -251,10 +264,11 @@ private void mergeWithSmoosher() throws IOException // Get processed elements from the stack and write. List fileToProcess = new ArrayList<>(completedFiles); completedFiles = Lists.newArrayList(); - for (File file: fileToProcess) - { + for (File file : fileToProcess) { add(file); - file.delete(); + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file.getPath()); + } } } @@ -265,7 +279,9 @@ private void mergeWithSmoosher() throws IOException * * @param name fileName * @param size size of the file. + * * @return + * * @throws IOException */ private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException @@ -275,14 +291,17 @@ private SmooshedWriter delegateSmooshedWriter(final String name, final long size return new SmooshedWriter() { - private int currOffset = 0; private final FileOutputStream out = new FileOutputStream(tmpFile); - private final GatheringByteChannel channel = out.getChannel();; + private final GatheringByteChannel channel = out.getChannel(); private final Closer closer = Closer.create(); + ; + private int currOffset = 0; + { closer.register(out); closer.register(channel); } + @Override public void close() throws IOException { @@ -294,6 +313,7 @@ public void close() throws IOException mergeWithSmoosher(); } } + public int bytesLeft() { return (int) (size - currOffset); @@ -347,17 +367,21 @@ public boolean isOpen() public void close() throws IOException { //book keeping checks on created file. - if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) - { - for (File file: completedFiles) - { - file.delete(); + if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) { + for (File file : completedFiles) { + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file.getPath()); + } } - for (File file: filesInProcess) - { - file.delete(); + for (File file : filesInProcess) { + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file.getPath()); + } } - throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size())); + throw new ISE(String.format( + "%d writers needs to be closed before closing smoosher.", + filesInProcess.size() + completedFiles.size() + )); } if (currOut != null) { @@ -393,16 +417,6 @@ private Outer getNewCurrOut() throws FileNotFoundException return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize); } - static File metaFile(File baseDir) - { - return new File(baseDir, String.format("meta.%s", FILE_EXTENSION)); - } - - static File makeChunkFile(File baseDir, int i) - { - return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION)); - } - public static class Outer implements SmooshedWriter { private final int fileNum; diff --git a/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java b/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java index a1c39813cacc..04cf1fe85a01 100644 --- a/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java @@ -23,7 +23,6 @@ import com.google.common.primitives.Ints; import io.druid.java.util.common.BufferUtils; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.guava.CloseQuietly; import junit.framework.Assert; import org.junit.Rule; import org.junit.Test; @@ -60,52 +59,44 @@ public void testSanity() throws Exception @Test public void testWhenFirstWriterClosedInTheMiddle() throws Exception { - File baseDir = Files.createTempDir(); - File[] files = baseDir.listFiles(); - Assert.assertNotNull(files); - Arrays.sort(files); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); for (int i = 0; i < 19; ++i) { File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); Files.write(Ints.toByteArray(i), tmpFile); smoosher.add(String.format("%d", i), tmpFile); - if (i==10) - { + if (i == 10) { writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); - CloseQuietly.close(writer); + writer.close(); } tmpFile.delete(); - } + } } validateOutput(baseDir); } - @Test(expected= ISE.class) + @Test(expected = ISE.class) public void testExceptionForUnClosedFiles() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { for (int i = 0; i < 19; ++i) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); } - smoosher.close(); - } + } } @Test public void testWhenFirstWriterClosedAtTheEnd() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); @@ -115,8 +106,7 @@ public void testWhenFirstWriterClosedAtTheEnd() throws Exception smoosher.add(String.format("%d", i), tmpFile); tmpFile.delete(); } - CloseQuietly.close(writer); - smoosher.close(); + writer.close(); } validateOutput(baseDir); } @@ -170,7 +160,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception boolean exceptionThrown = false; try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) { writer.write(ByteBuffer.wrap(Ints.toByteArray(1))); - } catch (ISE e) { + } + catch (ISE e) { Assert.assertTrue(e.getMessage().contains("Liar!!!")); exceptionThrown = true; } From 40d39f0f2a9d5c0ddcf6370597059fd950cd1752 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Wed, 9 Nov 2016 11:08:39 -0800 Subject: [PATCH 2/2] Addressed code review requested changes. --- .../java/util/common/io/smoosh/FileSmoosher.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java index 0b9b8adf9d9c..7e2991e2d016 100644 --- a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -267,7 +267,7 @@ private void mergeWithSmoosher() throws IOException for (File file : fileToProcess) { add(file); if (!file.delete()) { - LOG.warn("Unable to delete file [%s]", file.getPath()); + LOG.warn("Unable to delete file [%s]", file); } } } @@ -294,7 +294,7 @@ private SmooshedWriter delegateSmooshedWriter(final String name, final long size private final FileOutputStream out = new FileOutputStream(tmpFile); private final GatheringByteChannel channel = out.getChannel(); private final Closer closer = Closer.create(); - ; + private int currOffset = 0; { @@ -370,18 +370,18 @@ public void close() throws IOException if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) { for (File file : completedFiles) { if (!file.delete()) { - LOG.warn("Unable to delete file [%s]", file.getPath()); + LOG.warn("Unable to delete file [%s]", file); } } for (File file : filesInProcess) { if (!file.delete()) { - LOG.warn("Unable to delete file [%s]", file.getPath()); + LOG.warn("Unable to delete file [%s]", file); } } - throw new ISE(String.format( - "%d writers needs to be closed before closing smoosher.", - filesInProcess.size() + completedFiles.size() - )); + throw new ISE( + "[%d] writers in progress and [%d] completed writers needs to be closed before closing smoosher.", + filesInProcess.size(), completedFiles.size() + ); } if (currOut != null) {