diff --git a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java index 753bddda1ea7..7e2991e2d016 100644 --- a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -31,6 +31,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.MappedByteBufferHandler; +import io.druid.java.util.common.logger.Logger; import java.io.BufferedWriter; import java.io.Closeable; @@ -59,7 +60,9 @@ * various "chunk" files will be varying sizes and it is not possible to add a * file of size greater than Integer.MAX_VALUE. *

- * This class is not thread safe but allows writing multiple files even if main + * This class is not thread safe. + *

+ * This class allows writing multiple files even if main * smoosh file writer is open. If main smoosh file writer is already open, it * delegates the write into temporary file on the file system which is later * copied on to the main smoosh file and underlying temporary file will be @@ -69,6 +72,7 @@ public class FileSmoosher implements Closeable { private static final String FILE_EXTENSION = "smoosh"; private static final Joiner joiner = Joiner.on(","); + private static final Logger LOG = new Logger(FileSmoosher.class); private final File baseDir; private final int maxChunkSize; @@ -101,6 +105,16 @@ public FileSmoosher( Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); } + static File metaFile(File baseDir) + { + return new File(baseDir, String.format("meta.%s", FILE_EXTENSION)); + } + + static File makeChunkFile(File baseDir, int i) + { + return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION)); + } + public Set getInternalFilenames() { return internalFiles.keySet(); @@ -155,8 +169,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size) // If current writer is in use then create a new SmooshedWriter which // writes into temporary file which is later merged into original // FileSmoosher. - if (writerCurrentlyInUse) - { + if (writerCurrentlyInUse) { return delegateSmooshedWriter(name, size); } @@ -251,10 +264,11 @@ private void mergeWithSmoosher() throws IOException // Get processed elements from the stack and write. List fileToProcess = new ArrayList<>(completedFiles); completedFiles = Lists.newArrayList(); - for (File file: fileToProcess) - { + for (File file : fileToProcess) { add(file); - file.delete(); + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file); + } } } @@ -265,7 +279,9 @@ private void mergeWithSmoosher() throws IOException * * @param name fileName * @param size size of the file. + * * @return + * * @throws IOException */ private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException @@ -275,14 +291,17 @@ private SmooshedWriter delegateSmooshedWriter(final String name, final long size return new SmooshedWriter() { - private int currOffset = 0; private final FileOutputStream out = new FileOutputStream(tmpFile); - private final GatheringByteChannel channel = out.getChannel();; + private final GatheringByteChannel channel = out.getChannel(); private final Closer closer = Closer.create(); + + private int currOffset = 0; + { closer.register(out); closer.register(channel); } + @Override public void close() throws IOException { @@ -294,6 +313,7 @@ public void close() throws IOException mergeWithSmoosher(); } } + public int bytesLeft() { return (int) (size - currOffset); @@ -347,17 +367,21 @@ public boolean isOpen() public void close() throws IOException { //book keeping checks on created file. - if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) - { - for (File file: completedFiles) - { - file.delete(); + if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) { + for (File file : completedFiles) { + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file); + } } - for (File file: filesInProcess) - { - file.delete(); + for (File file : filesInProcess) { + if (!file.delete()) { + LOG.warn("Unable to delete file [%s]", file); + } } - throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size())); + throw new ISE( + "[%d] writers in progress and [%d] completed writers needs to be closed before closing smoosher.", + filesInProcess.size(), completedFiles.size() + ); } if (currOut != null) { @@ -393,16 +417,6 @@ private Outer getNewCurrOut() throws FileNotFoundException return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize); } - static File metaFile(File baseDir) - { - return new File(baseDir, String.format("meta.%s", FILE_EXTENSION)); - } - - static File makeChunkFile(File baseDir, int i) - { - return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION)); - } - public static class Outer implements SmooshedWriter { private final int fileNum; diff --git a/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java b/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java index a1c39813cacc..04cf1fe85a01 100644 --- a/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java @@ -23,7 +23,6 @@ import com.google.common.primitives.Ints; import io.druid.java.util.common.BufferUtils; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.guava.CloseQuietly; import junit.framework.Assert; import org.junit.Rule; import org.junit.Test; @@ -60,52 +59,44 @@ public void testSanity() throws Exception @Test public void testWhenFirstWriterClosedInTheMiddle() throws Exception { - File baseDir = Files.createTempDir(); - File[] files = baseDir.listFiles(); - Assert.assertNotNull(files); - Arrays.sort(files); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); for (int i = 0; i < 19; ++i) { File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); Files.write(Ints.toByteArray(i), tmpFile); smoosher.add(String.format("%d", i), tmpFile); - if (i==10) - { + if (i == 10) { writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); - CloseQuietly.close(writer); + writer.close(); } tmpFile.delete(); - } + } } validateOutput(baseDir); } - @Test(expected= ISE.class) + @Test(expected = ISE.class) public void testExceptionForUnClosedFiles() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { for (int i = 0; i < 19; ++i) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); } - smoosher.close(); - } + } } @Test public void testWhenFirstWriterClosedAtTheEnd() throws Exception { - File baseDir = Files.createTempDir(); + File baseDir = folder.newFolder("base"); - try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) - { + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); @@ -115,8 +106,7 @@ public void testWhenFirstWriterClosedAtTheEnd() throws Exception smoosher.add(String.format("%d", i), tmpFile); tmpFile.delete(); } - CloseQuietly.close(writer); - smoosher.close(); + writer.close(); } validateOutput(baseDir); } @@ -170,7 +160,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception boolean exceptionThrown = false; try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) { writer.write(ByteBuffer.wrap(Ints.toByteArray(1))); - } catch (ISE e) { + } + catch (ISE e) { Assert.assertTrue(e.getMessage().contains("Liar!!!")); exceptionThrown = true; }