Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions src/main/java/com/metamx/common/io/smoosh/FileSmoosher.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.primitives.Ints;
import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.MappedByteBufferHandler;

import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -161,7 +160,19 @@ public int write(ByteBuffer in) throws IOException
return verifySize(currOut.write(in));
}

private int verifySize(int bytesWrittenInChunk) throws IOException
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
{
return verifySize(currOut.write(srcs, offset, length));
}

@Override
public long write(ByteBuffer[] srcs) throws IOException
{
return verifySize(currOut.write(srcs));
}

private int verifySize(long bytesWrittenInChunk) throws IOException
{
bytesWritten += bytesWrittenInChunk;

Expand All @@ -172,7 +183,7 @@ private int verifySize(int bytesWrittenInChunk) throws IOException
throw new ISE("Wrote[%,d] bytes for something of size[%,d]. Liar!!!", bytesWritten, size);
}

return bytesWrittenInChunk;
return Ints.checkedCast(bytesWrittenInChunk);
}

@Override
Expand Down Expand Up @@ -232,7 +243,7 @@ private Outer getNewCurrOut() throws FileNotFoundException
final int fileNum = outFiles.size();
File outFile = makeChunkFile(baseDir, fileNum);
outFiles.add(outFile);
return new Outer(fileNum, new BufferedOutputStream(new FileOutputStream(outFile)), maxChunkSize);
return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize);
}

static File metaFile(File baseDir)
Expand All @@ -248,17 +259,19 @@ static File makeChunkFile(File baseDir, int i)
public static class Outer implements SmooshedWriter
{
private final int fileNum;
private final OutputStream out;
private final int maxLength;
private final GatheringByteChannel channel;

private boolean open = true;
private final Closer closer = Closer.create();
private int currOffset = 0;

Outer(int fileNum, OutputStream out, int maxLength)
Outer(int fileNum, FileOutputStream output, int maxLength)
{
this.fileNum = fileNum;
this.out = out;
this.channel = output.getChannel();
this.maxLength = maxLength;
closer.register(output);
closer.register(channel);
}

public int getFileNum()
Expand All @@ -279,38 +292,48 @@ public int bytesLeft()
@Override
public int write(ByteBuffer buffer) throws IOException
{
WritableByteChannel channel = Channels.newChannel(out);
return addToOffset(channel.write(buffer));
}

@Override
public int write(InputStream in) throws IOException
{
return addToOffset(Ints.checkedCast(ByteStreams.copy(in, out)));
return addToOffset(ByteStreams.copy(Channels.newChannel(in), channel));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double buffering here. Channels.newChannel() returns a buffered channel, and then ByteStreams.copy() uses a buffer internally. Made google/guava#2559 about this but it's faster and easier to fix internally in the meantime

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've neglected this because this is not used in Druid. But seemed good this to be fixed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, could be a separate PR. Also found this idiom (ByteStream.copy(Channels.newChannel(), ...) is used in Druid in several places, so it's more a Druid's issue than java-util's

}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
{
return addToOffset(channel.write(srcs, offset, length));
}

@Override
public long write(ByteBuffer[] srcs) throws IOException
{
return addToOffset(channel.write(srcs));
}

public int addToOffset(int numBytesWritten)
public int addToOffset(long numBytesWritten)
{
if (numBytesWritten > bytesLeft()) {
throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do that.", numBytesWritten, bytesLeft());
}

currOffset += numBytesWritten;

return numBytesWritten;
return Ints.checkedCast(numBytesWritten);
}

@Override
public boolean isOpen()
{
return open;
return channel.isOpen();
}

@Override
public void close() throws IOException
{
open = false;
out.close();
closer.close();
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/metamx/common/io/smoosh/SmooshedWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.GatheringByteChannel;

/**
*/
public interface SmooshedWriter extends Closeable, WritableByteChannel
public interface SmooshedWriter extends Closeable, GatheringByteChannel
{
public int write(InputStream in) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void testSanity() throws Exception
}

File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);

Assert.assertEquals(5, files.length); // 4 smooshed files and 1 meta file
Expand Down Expand Up @@ -91,6 +92,7 @@ public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Excep
}

File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);

Assert.assertEquals(6, files.length); // 4 smoosh files and 1 meta file
Expand Down Expand Up @@ -126,8 +128,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception

Assert.assertTrue(exceptionThrown);
File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(1, files.length);
Assert.assertEquals(0, files[0].length());
}
}

Expand Down