diff --git a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java
index 5ab1ffd0..0f5159d7 100644
--- a/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java
+++ b/src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java
@@ -40,6 +40,7 @@
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
+import java.util.ArrayList;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;
import java.util.List;
@@ -47,11 +48,19 @@
import java.util.Set;
/**
- * A class that concatenates files together into configurable sized chunks, works in conjunction
- * with the SmooshedFileMapper to provide access to the individual files.
+ * A class that concatenates files together into configurable sized chunks,
+ * works in conjunction with the SmooshedFileMapper to provide access to the
+ * individual files.
*
- * It does not split input files among separate output files, instead the various "chunk" files will
- * be varying sizes and it is not possible to add a file of size greater than Integer.MAX_VALUE
+ * It does not split input files among separate output files, instead the
+ * various "chunk" files will be varying sizes and it is not possible to add a
+ * file of size greater than Integer.MAX_VALUE.
+ *
+ * This class is not thread safe but allows writing multiple files even if main
+ * smoosh file writer is open. If main smoosh file writer is already open, it
+ * delegates the write into temporary file on the file system which is later
+ * copied on to the main smoosh file and underlying temporary file will be
+ * cleaned up.
*/
public class FileSmoosher implements Closeable
{
@@ -63,8 +72,13 @@ public class FileSmoosher implements Closeable
private final List outFiles = Lists.newArrayList();
private final Map internalFiles = Maps.newTreeMap();
+ // list of files completed writing content using delegated smooshedWriter.
+ private List completedFiles = Lists.newArrayList();
+ // list of files in process writing content using delegated smooshedWriter.
+ private List filesInProcess = Lists.newArrayList();
private Outer currOut = null;
+ private boolean writerCurrentlyInUse = false;
public FileSmoosher(
File baseDir
@@ -130,9 +144,19 @@ public void add(String name, List bufferToAdd) throws IOException
public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException
{
+
if (size > maxChunkSize) {
throw new IAE("Asked to add buffers[%,d] larger than configured max[%,d]", size, maxChunkSize);
}
+
+ // If current writer is in use then create a new SmooshedWriter which
+ // writes into temporary file which is later merged into original
+ // FileSmoosher.
+ if (writerCurrentlyInUse)
+ {
+ return delegateSmooshedWriter(name, size);
+ }
+
if (currOut == null) {
currOut = getNewCurrOut();
}
@@ -142,7 +166,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size)
}
final int startOffset = currOut.getCurrOffset();
-
+ writerCurrentlyInUse = true;
return new SmooshedWriter()
{
private boolean open = true;
@@ -197,6 +221,7 @@ public void close() throws IOException
{
open = false;
internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset()));
+ writerCurrentlyInUse = false;
if (bytesWritten != currOut.getCurrOffset() - startOffset) {
throw new ISE("WTF? Perhaps there is some concurrent modification going on?");
@@ -206,13 +231,132 @@ public void close() throws IOException
String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten)
);
}
+ // Merge temporary files on to the main smoosh file.
+ mergeWithSmoosher();
+ }
+ };
+ }
+
+ /**
+ * Merges temporary files created by delegated SmooshedWriters on to the main
+ * smoosh file.
+ *
+ * @throws IOException
+ */
+ private void mergeWithSmoosher() throws IOException
+ {
+ // Get processed elements from the stack and write.
+ List fileToProcess = new ArrayList<>(completedFiles);
+ completedFiles = Lists.newArrayList();
+ for (File file: fileToProcess)
+ {
+ add(file);
+ file.delete();
+ }
+ }
+
+ /**
+ * Returns a new SmooshedWriter which writes into temporary file and close
+ * method on returned SmooshedWriter tries to merge temporary file into
+ * original FileSmoosher object(if not open).
+ *
+ * @param name fileName
+ * @param size size of the file.
+ * @return
+ * @throws IOException
+ */
+ private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException
+ {
+ final File tmpFile = new File(baseDir, name);
+ filesInProcess.add(tmpFile);
+
+ return new SmooshedWriter()
+ {
+ private int currOffset = 0;
+ private final FileOutputStream out = new FileOutputStream(tmpFile);
+ private final GatheringByteChannel channel = out.getChannel();;
+ private final Closer closer = Closer.create();
+ {
+ closer.register(out);
+ closer.register(channel);
+ }
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ completedFiles.add(tmpFile);
+ filesInProcess.remove(tmpFile);
+
+ if (!writerCurrentlyInUse) {
+ mergeWithSmoosher();
+ }
+ }
+ public int bytesLeft()
+ {
+ return (int) (size - currOffset);
+ }
+
+ @Override
+ public int write(ByteBuffer buffer) throws IOException
+ {
+ return addToOffset(channel.write(buffer));
+ }
+
+ @Override
+ public int write(InputStream in) throws IOException
+ {
+ return addToOffset(ByteStreams.copy(Channels.newChannel(in), channel));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
+ {
+ return addToOffset(channel.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return addToOffset(channel.write(srcs));
+ }
+
+ public int addToOffset(long numBytesWritten)
+ {
+ if (numBytesWritten > bytesLeft()) {
+ throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft());
+ }
+ currOffset += numBytesWritten;
+
+ return Ints.checkedCast(numBytesWritten);
}
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
};
+
}
@Override
public void close() throws IOException
{
+ //book keeping checks on created file.
+ if (!completedFiles.isEmpty() || !filesInProcess.isEmpty())
+ {
+ for (File file: completedFiles)
+ {
+ file.delete();
+ }
+ for (File file: filesInProcess)
+ {
+ file.delete();
+ }
+ throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size()));
+ }
+
if (currOut != null) {
currOut.close();
}
@@ -318,7 +462,6 @@ public int addToOffset(long numBytesWritten)
if (numBytesWritten > bytesLeft()) {
throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft());
}
-
currOffset += numBytesWritten;
return Ints.checkedCast(numBytesWritten);
diff --git a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java
index 74f87731..033a7461 100644
--- a/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java
+++ b/src/test/java/com/metamx/common/io/smoosh/SmooshedFileMapperTest.java
@@ -20,6 +20,8 @@
import com.google.common.primitives.Ints;
import com.metamx.common.BufferUtils;
import com.metamx.common.ISE;
+import com.metamx.common.guava.CloseQuietly;
+
import junit.framework.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -50,26 +52,71 @@ public void testSanity() throws Exception
smoosher.add(String.format("%d", i), tmpFile);
}
}
+ validateOutput(baseDir);
+ }
+ @Test
+ public void testWhenFirstWriterClosedInTheMiddle() throws Exception
+ {
+ File baseDir = Files.createTempDir();
File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);
- Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file
- for (int i = 0; i < 4; ++i) {
- Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]);
+ try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
+ {
+ final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
+
+ for (int i = 0; i < 19; ++i) {
+ File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
+ Files.write(Ints.toByteArray(i), tmpFile);
+ smoosher.add(String.format("%d", i), tmpFile);
+ if (i==10)
+ {
+ writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
+ CloseQuietly.close(writer);
+ }
+ tmpFile.delete();
+ }
}
- Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]);
+ validateOutput(baseDir);
+ }
- try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) {
- for (int i = 0; i < 20; ++i) {
- ByteBuffer buf = mapper.mapFile(String.format("%d", i));
- Assert.assertEquals(0, buf.position());
- Assert.assertEquals(4, buf.remaining());
- Assert.assertEquals(4, buf.capacity());
- Assert.assertEquals(i, buf.getInt());
+ @Test(expected= ISE.class)
+ public void testExceptionForUnClosedFiles() throws Exception
+ {
+ File baseDir = Files.createTempDir();
+
+ try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
+ {
+ for (int i = 0; i < 19; ++i) {
+ final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4);
+ writer.write(ByteBuffer.wrap(Ints.toByteArray(i)));
}
+ smoosher.close();
+ }
+ }
+
+ @Test
+ public void testWhenFirstWriterClosedAtTheEnd() throws Exception
+ {
+ File baseDir = Files.createTempDir();
+
+ try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
+ {
+ final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
+ writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
+
+ for (int i = 0; i < 19; ++i) {
+ File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
+ Files.write(Ints.toByteArray(i), tmpFile);
+ smoosher.add(String.format("%d", i), tmpFile);
+ tmpFile.delete();
+ }
+ CloseQuietly.close(writer);
+ smoosher.close();
}
+ validateOutput(baseDir);
}
@Test
@@ -150,4 +197,26 @@ public void testDeterministicFileUnmapping() throws IOException
// Assert no hanging file mappings left by either smoosher or smoosher.add(file)
Assert.assertEquals(totalMemoryUsedBeforeAddingFile, totalMemoryUsedAfterAddingFile);
}
-}
+
+ private void validateOutput(File baseDir) throws IOException
+ {
+ File[] files = baseDir.listFiles();
+ Arrays.sort(files);
+
+ Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file
+ for (int i = 0; i < 4; ++i) {
+ Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]);
+ }
+ Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]);
+
+ try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) {
+ for (int i = 0; i < 20; ++i) {
+ ByteBuffer buf = mapper.mapFile(String.format("%d", i));
+ Assert.assertEquals(0, buf.position());
+ Assert.assertEquals(4, buf.remaining());
+ Assert.assertEquals(4, buf.capacity());
+ Assert.assertEquals(i, buf.getInt());
+ }
+ }
+ }
+}
\ No newline at end of file