Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d203cac
Add PrefetcheableTextFilesFirehoseFactory
jihoonson Apr 19, 2017
3a45119
fix comment
jihoonson Apr 21, 2017
bbdba21
exception handling
jihoonson Apr 23, 2017
4e7ab6a
Fix wrong json property
jihoonson Apr 25, 2017
cb65cde
Remove ReplayableFirehoseFactory and fix misspelling
jihoonson Apr 25, 2017
aa5f28d
Defer object initialization
jihoonson Apr 26, 2017
7ea1304
Add a temporaryDirectory parameter to FirehoseFactory.connect()
jihoonson Apr 26, 2017
19662b3
fix when cache and fetch are disabled
jihoonson May 3, 2017
0c97820
Address comments
jihoonson May 4, 2017
2e8b6b0
Add more test
jihoonson May 4, 2017
f6bbb56
Increase timeout for test
jihoonson May 4, 2017
f598515
Add wrapObjectStream
jihoonson May 5, 2017
c5bd32d
Move methods to Firehose from PrefetchableFirehoseFactory
jihoonson May 5, 2017
d47abdd
Cleanup comment
jihoonson May 5, 2017
65892de
add directory listing to s3 firehose
jihoonson May 5, 2017
185d6e9
Rename a variable
jihoonson May 7, 2017
3dc991d
Addressing comments
jihoonson May 10, 2017
a8d9af3
Update document
jihoonson May 10, 2017
02d22c9
Support disabling prefetch
jihoonson May 10, 2017
4611652
Fix race condition
jihoonson May 11, 2017
2e172ef
Add fetchLock
jihoonson May 11, 2017
808f681
Merge branch 'master' of https://github.com/druid-io/druid into abstr…
jihoonson May 12, 2017
e8fad8f
Remove ReplayableFirehoseFactoryTest
jihoonson May 12, 2017
d08fa32
Merge branch 'master' of https://github.com/druid-io/druid into abstr…
jihoonson May 16, 2017
ac5f528
Fix compilation error
jihoonson May 16, 2017
d3ece3f
Fix test failure
jihoonson May 16, 2017
0933d1b
Address comments
jihoonson May 18, 2017
5c8aa62
Add default implementation for new method
jihoonson May 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions api/src/main/java/io/druid/data/input/FirehoseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
package io.druid.data.input;

import com.fasterxml.jackson.annotation.JsonTypeInfo;

import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.parsers.ParseException;

import java.io.File;
import java.io.IOException;

/**
* FirehoseFactory creates a {@link Firehose} which is an interface holding onto the stream of incoming data.
* It currently provides two methods for creating a {@link Firehose} and their default implementations call each other
* for the backward compatibility. Implementations of this interface must implement one of these methods.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface FirehoseFactory<T extends InputRowParser>
{
Expand All @@ -36,7 +42,31 @@ public interface FirehoseFactory<T extends InputRowParser>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
* @param parser an input row parser
*/
public Firehose connect(T parser) throws IOException, ParseException;
@Deprecated
default Firehose connect(T parser) throws IOException, ParseException
{
return connect(parser, null);
}

/**
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
* <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
* <p/>
* Some fire hoses like {@link PrefetchableTextFilesFirehoseFactory} may use a temporary
* directory to cache data in it.
*
* @param parser an input row parser
* @param temporaryDirectory a directory where temporary files are stored
*/
default Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException
{
return connect(parser);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.data.input.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

/**
* This is an abstract class for firehose factory for making firehoses reading text files.
* It provides an unified {@link #connect(StringInputRowParser, File)} implementation for its subclasses.
*
* @param <ObjectType> object type representing input data
*/
public abstract class AbstractTextFilesFirehoseFactory<ObjectType>
implements FirehoseFactory<StringInputRowParser>
{
private static final Logger LOG = new Logger(AbstractTextFilesFirehoseFactory.class);

private List<ObjectType> objects;

@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
}
final Iterator<ObjectType> iterator = objects.iterator();
return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return iterator.hasNext();
}

@Override
public LineIterator next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final ObjectType object = iterator.next();
try {
return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), Charsets.UTF_8);
}
catch (Exception e) {
LOG.error(
e,
"Exception reading object[%s]",
object
);
throw Throwables.propagate(e);
}
}
},
firehoseParser
);
}

/**
* Initialize objects to be read by this firehose. Since firehose factories are constructed whenever
* io.druid.indexing.common.task.Task objects are deserialized, actual initialization of objects is deferred
* until {@link #connect(StringInputRowParser, File)} is called.
*
* @return a collection of initialized objects.
*/
protected abstract Collection<ObjectType> initObjects() throws IOException;

/**
* Open an input stream from the given object. If the object is compressed, this method should return a byte stream
* as it is compressed. The object compression should be handled in {@link #wrapObjectStream(Object, InputStream)}.
*
* @param object an object to be read
*
* @return an input stream for the object
*
* @throws IOException
*/
protected abstract InputStream openObjectStream(ObjectType object) throws IOException;

/**
* Wrap the given input stream if needed. The decompression logic should be applied to the given stream if the object
* is compressed.
*
* @param object an input object
* @param stream a stream for the object
* @return
* @throws IOException
*/
protected abstract InputStream wrapObjectStream(ObjectType object, InputStream stream) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.druid.utils.Runnables;
import org.apache.commons.io.LineIterator;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -37,13 +38,25 @@ public class FileIteratingFirehose implements Firehose

private LineIterator lineIterator = null;

private final Closeable closer;

public FileIteratingFirehose(
Iterator<LineIterator> lineIterators,
StringInputRowParser parser
)
{
this(lineIterators, parser, null);
}

public FileIteratingFirehose(
Iterator<LineIterator> lineIterators,
StringInputRowParser parser,
Closeable closer
)
{
this.lineIterators = lineIterators;
this.parser = parser;
this.closer = closer;
}

@Override
Expand Down Expand Up @@ -86,8 +99,24 @@ public Runnable commit()
@Override
public void close() throws IOException
{
if (lineIterator != null) {
lineIterator.close();
try {
if (lineIterator != null) {
lineIterator.close();
}
}
catch (Throwable t) {
try {
if (closer != null) {
closer.close();
}
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
if (closer != null) {
closer.close();
}
}
}
Loading