Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MappedByteBufferHandler;
import io.druid.java.util.common.logger.Logger;

import java.io.BufferedWriter;
import java.io.Closeable;
Expand Down Expand Up @@ -59,7 +60,9 @@
* various "chunk" files will be varying sizes and it is not possible to add a
* file of size greater than Integer.MAX_VALUE.
* <p/>
* This class is not thread safe but allows writing multiple files even if main
* This class is not thread safe.
* <p/>
* This class allows writing multiple files even if main
* smoosh file writer is open. If main smoosh file writer is already open, it
* delegates the write into temporary file on the file system which is later
* copied on to the main smoosh file and underlying temporary file will be
Expand All @@ -69,6 +72,7 @@ public class FileSmoosher implements Closeable
{
private static final String FILE_EXTENSION = "smoosh";
private static final Joiner joiner = Joiner.on(",");
private static final Logger LOG = new Logger(FileSmoosher.class);

private final File baseDir;
private final int maxChunkSize;
Expand Down Expand Up @@ -101,6 +105,16 @@ public FileSmoosher(
Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value.");
}

static File metaFile(File baseDir)
{
return new File(baseDir, String.format("meta.%s", FILE_EXTENSION));
}

static File makeChunkFile(File baseDir, int i)
{
return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION));
}

public Set<String> getInternalFilenames()
{
return internalFiles.keySet();
Expand Down Expand Up @@ -155,8 +169,7 @@ public SmooshedWriter addWithSmooshedWriter(final String name, final long size)
// If current writer is in use then create a new SmooshedWriter which
// writes into temporary file which is later merged into original
// FileSmoosher.
if (writerCurrentlyInUse)
{
if (writerCurrentlyInUse) {
return delegateSmooshedWriter(name, size);
}

Expand Down Expand Up @@ -251,10 +264,11 @@ private void mergeWithSmoosher() throws IOException
// Get processed elements from the stack and write.
List<File> fileToProcess = new ArrayList<>(completedFiles);
completedFiles = Lists.newArrayList();
for (File file: fileToProcess)
{
for (File file : fileToProcess) {
add(file);
file.delete();
if (!file.delete()) {
LOG.warn("Unable to delete file [%s]", file);
}
}
}

Expand All @@ -265,7 +279,9 @@ private void mergeWithSmoosher() throws IOException
*
* @param name fileName
* @param size size of the file.
*
* @return
*
* @throws IOException
*/
private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException
Expand All @@ -275,14 +291,17 @@ private SmooshedWriter delegateSmooshedWriter(final String name, final long size

return new SmooshedWriter()
{
private int currOffset = 0;
private final FileOutputStream out = new FileOutputStream(tmpFile);
private final GatheringByteChannel channel = out.getChannel();;
private final GatheringByteChannel channel = out.getChannel();
private final Closer closer = Closer.create();

private int currOffset = 0;

{
closer.register(out);
closer.register(channel);
}

@Override
public void close() throws IOException
{
Expand All @@ -294,6 +313,7 @@ public void close() throws IOException
mergeWithSmoosher();
}
}

public int bytesLeft()
{
return (int) (size - currOffset);
Expand Down Expand Up @@ -347,17 +367,21 @@ public boolean isOpen()
public void close() throws IOException
{
//book keeping checks on created file.
if (!completedFiles.isEmpty() || !filesInProcess.isEmpty())
{
for (File file: completedFiles)
{
file.delete();
if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) {
for (File file : completedFiles) {
if (!file.delete()) {
LOG.warn("Unable to delete file [%s]", file);
}
}
for (File file: filesInProcess)
{
file.delete();
for (File file : filesInProcess) {
if (!file.delete()) {
LOG.warn("Unable to delete file [%s]", file);
}
}
throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size()));
throw new ISE(
"[%d] writers in progress and [%d] completed writers needs to be closed before closing smoosher.",
filesInProcess.size(), completedFiles.size()
);
}

if (currOut != null) {
Expand Down Expand Up @@ -393,16 +417,6 @@ private Outer getNewCurrOut() throws FileNotFoundException
return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize);
}

static File metaFile(File baseDir)
{
return new File(baseDir, String.format("meta.%s", FILE_EXTENSION));
}

static File makeChunkFile(File baseDir, int i)
{
return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION));
}

public static class Outer implements SmooshedWriter
{
private final int fileNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.primitives.Ints;
import io.druid.java.util.common.BufferUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly;
import junit.framework.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -60,52 +59,44 @@ public void testSanity() throws Exception
@Test
public void testWhenFirstWriterClosedInTheMiddle() throws Exception
{
File baseDir = Files.createTempDir();
File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);
File baseDir = folder.newFolder("base");

try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);

for (int i = 0; i < 19; ++i) {
File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
Files.write(Ints.toByteArray(i), tmpFile);
smoosher.add(String.format("%d", i), tmpFile);
if (i==10)
{
if (i == 10) {
writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
CloseQuietly.close(writer);
writer.close();
}
tmpFile.delete();
}
}
}
validateOutput(baseDir);
}

@Test(expected= ISE.class)
@Test(expected = ISE.class)
public void testExceptionForUnClosedFiles() throws Exception
{
File baseDir = Files.createTempDir();
File baseDir = folder.newFolder("base");

try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
for (int i = 0; i < 19; ++i) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(i)));
}
smoosher.close();
}
}
}

@Test
public void testWhenFirstWriterClosedAtTheEnd() throws Exception
{
File baseDir = Files.createTempDir();
File baseDir = folder.newFolder("base");

try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21))
{
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));

Expand All @@ -115,8 +106,7 @@ public void testWhenFirstWriterClosedAtTheEnd() throws Exception
smoosher.add(String.format("%d", i), tmpFile);
tmpFile.delete();
}
CloseQuietly.close(writer);
smoosher.close();
writer.close();
}
validateOutput(baseDir);
}
Expand Down Expand Up @@ -170,7 +160,8 @@ public void testBehaviorWhenReportedSizesSmall() throws Exception
boolean exceptionThrown = false;
try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) {
writer.write(ByteBuffer.wrap(Ints.toByteArray(1)));
} catch (ISE e) {
}
catch (ISE e) {
Assert.assertTrue(e.getMessage().contains("Liar!!!"));
exceptionThrown = true;
}
Expand Down