Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 149 additions & 6 deletions src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,27 @@
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* A class that concatenates files together into configurable sized chunks, works in conjunction
* with the SmooshedFileMapper to provide access to the individual files.
* A class that concatenates files together into configurable sized chunks,
* works in conjunction with the SmooshedFileMapper to provide access to the
* individual files.
* <p/>
* It does not split input files among separate output files, instead the various "chunk" files will
* be varying sizes and it is not possible to add a file of size greater than Integer.MAX_VALUE
* It does not split input files among separate output files, instead the
* various "chunk" files will be varying sizes and it is not possible to add a
* file of size greater than Integer.MAX_VALUE.
* <p/>
* This class is not thread safe but allows writing multiple files even if main
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to a straight forward declaration like This class is no thread safe on its own line.

You can describe the handling of multiple files in another paragraph but such description should in no way mangled with explanations of thread safett.

* smoosh file writer is open. If main smoosh file writer is already open, it
* delegates the write into temporary file on the file system which is later
* copied on to the main smoosh file and underlying temporary file will be
* cleaned up.
*/
public class FileSmoosher implements Closeable
{
Expand All @@ -63,8 +72,13 @@ public class FileSmoosher implements Closeable

private final List<File> outFiles = Lists.newArrayList();
private final Map<String, Metadata> internalFiles = Maps.newTreeMap();
// list of files completed writing content using delegated smooshedWriter.
Copy link
Copy Markdown
Contributor

@leventov leventov Oct 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to write such comments as Javadoc /** ... */, then on automatic refactorings and reformattings IDEs will move comments along with fields. Javadoc comments are allowed for private fields.

private List<File> completedFiles = Lists.newArrayList();
// list of files in process writing content using delegated smooshedWriter.
private List<File> filesInProcess = Lists.newArrayList();

private Outer currOut = null;
private boolean writerCurrentlyInUse = false;

public FileSmoosher(
File baseDir
Expand Down Expand Up @@ -130,9 +144,19 @@ public void add(String name, List<ByteBuffer> bufferToAdd) throws IOException

public SmooshedWriter addWithSmooshedWriter(final String name, final long size) throws IOException
{

if (size > maxChunkSize) {
throw new IAE("Asked to add buffers[%,d] larger than configured max[%,d]", size, maxChunkSize);
}

// If current writer is in use then create a new SmooshedWriter which
// writes into temporary file which is later merged into original
// FileSmoosher.
if (writerCurrentlyInUse)
{
return delegateSmooshedWriter(name, size);
}

if (currOut == null) {
currOut = getNewCurrOut();
}
Expand All @@ -142,7 +166,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size)
}

final int startOffset = currOut.getCurrOffset();

writerCurrentlyInUse = true;
return new SmooshedWriter()
{
private boolean open = true;
Expand Down Expand Up @@ -197,6 +221,7 @@ public void close() throws IOException
{
open = false;
internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset()));
writerCurrentlyInUse = false;

if (bytesWritten != currOut.getCurrOffset() - startOffset) {
throw new ISE("WTF? Perhaps there is some concurrent modification going on?");
Expand All @@ -206,13 +231,132 @@ public void close() throws IOException
String.format("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten)
);
}
// Merge temporary files on to the main smoosh file.
mergeWithSmoosher();
}
};
}

/**
* Merges temporary files created by delegated SmooshedWriters on to the main
* smoosh file.
*
* @throws IOException
*/
private void mergeWithSmoosher() throws IOException
{
// Get processed elements from the stack and write.
List<File> fileToProcess = new ArrayList<>(completedFiles);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be just iteration over completedFiles (currently lines 251-256), then completedFiles.clear().

completedFiles = Lists.newArrayList();
for (File file: fileToProcess)
{
add(file);
file.delete();
}
}

/**
* Returns a new SmooshedWriter which writes into temporary file and close
* method on returned SmooshedWriter tries to merge temporary file into
* original FileSmoosher object(if not open).
*
* @param name fileName
* @param size size of the file.
* @return
* @throws IOException
*/
private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException
{
final File tmpFile = new File(baseDir, name);
filesInProcess.add(tmpFile);

return new SmooshedWriter()
{
private int currOffset = 0;
private final FileOutputStream out = new FileOutputStream(tmpFile);
private final GatheringByteChannel channel = out.getChannel();;
private final Closer closer = Closer.create();
{
closer.register(out);
closer.register(channel);
}
@Override
public void close() throws IOException
{
closer.close();
completedFiles.add(tmpFile);
filesInProcess.remove(tmpFile);

if (!writerCurrentlyInUse) {
mergeWithSmoosher();
}
}
public int bytesLeft()
{
return (int) (size - currOffset);
}

@Override
public int write(ByteBuffer buffer) throws IOException
{
return addToOffset(channel.write(buffer));
}

@Override
public int write(InputStream in) throws IOException
{
return addToOffset(ByteStreams.copy(Channels.newChannel(in), channel));
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
{
return addToOffset(channel.write(srcs, offset, length));
}

@Override
public long write(ByteBuffer[] srcs) throws IOException
{
return addToOffset(channel.write(srcs));
}

public int addToOffset(long numBytesWritten)
{
if (numBytesWritten > bytesLeft()) {
throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft());
}
currOffset += numBytesWritten;

return Ints.checkedCast(numBytesWritten);
}

@Override
public boolean isOpen()
{
return channel.isOpen();
}

};

}

@Override
public void close() throws IOException
{
//book keeping checks on created file.
if (!completedFiles.isEmpty() || !filesInProcess.isEmpty())
{
for (File file: completedFiles)
{
file.delete();
Copy link
Copy Markdown
Contributor

@leventov leventov Oct 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe merge completedFiles here using mergeWithSmoosher(), rather than silently delete?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entry into the if statement bounding these for loops causes an exception to be thrown, the deletion is to clean up state before that exception is thrown.

}
for (File file: filesInProcess)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throw some exception here, if filesToProcess is not empty? Because presence of files in filesInProcess means somebody opened a smoosherWriter but didn't close it before closing FileSmoosher. It seems like a bug.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an exception thrown just as soon as this for loop exits for exactly that reason. It's a bug if any files haven't yet been merged by the time the larger Smoosher gets closed (each of the individual Writers should have been closed and those should have caused everything to get merged).

{
file.delete();
}
throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size()));
}

if (currOut != null) {
currOut.close();
}
Expand Down Expand Up @@ -318,7 +462,6 @@ public int addToOffset(long numBytesWritten)
if (numBytesWritten > bytesLeft()) {
throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft());
}

currOffset += numBytesWritten;

return Ints.checkedCast(numBytesWritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.primitives.Ints;
import com.metamx.common.BufferUtils;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;

import junit.framework.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -50,26 +52,71 @@ public void testSanity() throws Exception
smoosher.add(String.format("%d", i), tmpFile);
}
}
validateOutput(baseDir);
}

@Test
public void testWhenFirstWriterClosedInTheMiddle() throws Exception
{
File baseDir = Files.createTempDir();
File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);

Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file
for (int i = 0; i < 4; ++i) {
Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]);
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);

for (int i = 0; i < 19; ++i) {
File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
Files.write(Ints.toByteArray(i), tmpFile);
smoosher.add(String.format("%d", i), tmpFile);
if (i==10)
{
writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
CloseQuietly.close(writer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why quietly?

}
tmpFile.delete();
}
}
Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]);
validateOutput(baseDir);
}

try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) {
for (int i = 0; i < 20; ++i) {
ByteBuffer buf = mapper.mapFile(String.format("%d", i));
Assert.assertEquals(0, buf.position());
Assert.assertEquals(4, buf.remaining());
Assert.assertEquals(4, buf.capacity());
Assert.assertEquals(i, buf.getInt());
@Test(expected= ISE.class)
public void testExceptionForUnClosedFiles() throws Exception
{
File baseDir = Files.createTempDir();

try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
for (int i = 0; i < 19; ++i) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(i)));
}
smoosher.close();
}
}

@Test
public void testWhenFirstWriterClosedAtTheEnd() throws Exception
{
File baseDir = Files.createTempDir();

try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));

for (int i = 0; i < 19; ++i) {
File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
Files.write(Ints.toByteArray(i), tmpFile);
smoosher.add(String.format("%d", i), tmpFile);
tmpFile.delete();
}
CloseQuietly.close(writer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why quietly?

smoosher.close();
}
validateOutput(baseDir);
}

@Test
Expand Down Expand Up @@ -150,4 +197,26 @@ public void testDeterministicFileUnmapping() throws IOException
// Assert no hanging file mappings left by either smoosher or smoosher.add(file)
Assert.assertEquals(totalMemoryUsedBeforeAddingFile, totalMemoryUsedAfterAddingFile);
}
}

private void validateOutput(File baseDir) throws IOException
{
File[] files = baseDir.listFiles();
Arrays.sort(files);

Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file
for (int i = 0; i < 4; ++i) {
Assert.assertEquals(FileSmoosher.makeChunkFile(baseDir, i), files[i]);
}
Assert.assertEquals(FileSmoosher.metaFile(baseDir), files[files.length - 1]);

try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) {
for (int i = 0; i < 20; ++i) {
ByteBuffer buf = mapper.mapFile(String.format("%d", i));
Assert.assertEquals(0, buf.position());
Assert.assertEquals(4, buf.remaining());
Assert.assertEquals(4, buf.capacity());
Assert.assertEquals(i, buf.getInt());
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline