Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.CompressionUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
Expand All @@ -31,12 +33,18 @@
import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -45,93 +53,122 @@
*/
public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);

private final File baseDir;
private final String filter;
private final StringInputRowParser parser;

@JsonCreator
public LocalFirehoseFactory(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser
)
{
this.baseDir = baseDir;
this.filter = filter;
this.parser = parser;
}

@JsonProperty
public File getBaseDir()
{
return baseDir;
}

@JsonProperty
public String getFilter()
{
return filter;
}

@JsonProperty
public StringInputRowParser getParser()
{
return parser;
}

@Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{
if (baseDir == null) {
throw new IAE("baseDir is null");
}
log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile());

Collection<File> foundFiles = FileUtils.listFiles(
baseDir.getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE
);

if (foundFiles == null || foundFiles.isEmpty()) {
throw new ISE("Found no files to ingest! Check your schema.");
}
log.info ("Found files: " + foundFiles);

final LinkedList<File> files = Lists.newLinkedList(
foundFiles
);

return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return !files.isEmpty();
}

@Override
public LineIterator next()
{
try {
return FileUtils.lineIterator(files.poll());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}
},
firehoseParser
);
}
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears you changed spaces to tabs. druid uses 2 spaces for indentation. pls use appropriate formatter as mentioned in https://github.com/druid-io/druid/blob/master/CONTRIBUTING.md


private final File baseDir;
private final String filter;
private final StringInputRowParser parser;

@JsonCreator
public LocalFirehoseFactory(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser)
{
this.baseDir = baseDir;
this.filter = filter;
this.parser = parser;
}

@JsonProperty
public File getBaseDir()
{
return baseDir;
}

@JsonProperty
public String getFilter()
{
return filter;
}

@JsonProperty
public StringInputRowParser getParser()
{
return parser;
}

@Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{
if (baseDir == null)
{
throw new IAE("baseDir is null");
}
log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile());

Collection<File> foundFiles = FileUtils.listFiles(
baseDir.getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE);

if (foundFiles == null || foundFiles.isEmpty())
{
throw new ISE("Found no files to ingest! Check your schema.");
}
log.info("Found files: " + foundFiles);

final LinkedList<File> files = Lists.newLinkedList(
foundFiles);

return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return !files.isEmpty();
}

@Override
public LineIterator next()
{
final File f = files.poll();
InputStream rawInputStream = null;
try
{
rawInputStream = new FileInputStream(f);
final InputStream inputStream;
String logMessage;
if (CompressionUtils.isGz(f.getName()))
{
logMessage = "Reading gzipped file [%s]";
inputStream = CompressionUtils.gzipInputStream(rawInputStream);
} else
{
logMessage = "Reading file [%s]";
inputStream = rawInputStream;
}

log.info(logMessage, f.getName());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just put the log.info in both places above instead?


return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(inputStream, Charsets.UTF_8)));
} catch (Exception e)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do any of the checked exceptions include InterruptedException? If so it should be handled in a way that does not loose the interrupted status

{
log.warn(e, "Failed to read file [%s]", f.getName());
if (rawInputStream != null)
{
try
{
rawInputStream.close();
} catch (IOException ioe)
{
Throwables.propagate(ioe);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest e.addSuppressed(ioe); throw Throwables.propagate(e);

}
}
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}
},
firehoseParser);
}
}