Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/druid/data/input/Committer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.druid.guice.annotations.ExtensionPoint;

import javax.annotation.Nullable;

/**
* Committer includes a Runnable and a Jackson-serialized metadata object containing the offset
*/
Expand All @@ -32,5 +34,6 @@ public interface Committer extends Runnable
* which needs to be serialized and deserialized by Jackson.
* Commit metadata can be a complex type, but we recommend keeping it to List/Map/"Primitive JSON" types
*/
@Nullable
Object getMetadata();
}
38 changes: 5 additions & 33 deletions core/src/main/java/org/apache/druid/data/input/Firehose.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
* any) run out.
*
* Concurrency:
* The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
* {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
* called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
* #commit()}.
* The two methods {@link #hasMore()} and {@link #nextRow()} are all called from the same thread.
* {@link #close()} might be called concurrently from a thread different from the thread calling {@link #hasMore()}
* and {@link #nextRow()}.
* </p>
*/
@ExtensionPoint
Expand Down Expand Up @@ -86,36 +85,9 @@ default InputRowPlusRaw nextRowWithRaw() throws IOException
}

/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
* often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
* but doesn't necessarily have to be.
*
* This method is called when the main processing loop starts to persist its current batch of things to process.
* The returned runnable will be run when the current batch has been successfully persisted, there is usually
* some time lag between when this method is called and when the runnable is run. The Runnable is also run on
* a separate thread so its operation should be thread-safe.
*
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
* <p>
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/
Runnable commit();

/**
* Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
* #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()} and {@link
* #nextRow()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* continue to work after close(), but since the ingestion side is closed rows will eventually run out.
*
* The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
* doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
* run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
* {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
* run() on the returned Runnable then can't be called.
*/
@Override
void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface FirehoseFactory<T extends InputRowParser>
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
* <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
Expand All @@ -58,7 +58,7 @@ default Firehose connect(T parser) throws IOException, ParseException
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
* <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -109,12 +108,6 @@ private LineIterator getNextLineIterator() throws IOException
return iterator;
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public TaskStatus run(final TaskToolbox toolbox)

this.metrics = fireDepartmentForMetrics.getMetrics();

Supplier<Committer> committerSupplier = null;
final Supplier<Committer> committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();

DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
Expand Down Expand Up @@ -351,7 +351,6 @@ public TaskStatus run(final TaskToolbox toolbox)
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
committerSupplier = Committers.supplierFromFirehose(firehose);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public String getVersion(final Interval interval)

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);

Supplier<Committer> committerSupplier = null;
final Supplier<Committer> committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();

LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
Expand Down Expand Up @@ -387,7 +387,6 @@ public String getVersion(final Interval interval)
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
committerSupplier = Committers.supplierFromFirehose(firehose);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -172,12 +171,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -168,12 +167,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;

import java.io.File;
import java.io.InputStream;
Expand Down Expand Up @@ -190,12 +189,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
Expand Down Expand Up @@ -224,12 +223,6 @@ public InputRow nextRow()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,6 @@ public InputRow nextRow()
throw new RuntimeException("HA HA HA");
}

@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{

}
};
}

@Override
public void close()
{
Expand Down Expand Up @@ -345,19 +332,6 @@ public InputRow nextRow()
return inputRowIterator.next();
}

@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{

}
};
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ public InputRow nextRow() throws IOException
return rv;
}

@Override
public Runnable commit()
{
return currentFirehose.commit();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -448,12 +447,6 @@ public InputRow nextRow()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public int getCurrentBufferSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ public InputRow nextRow() throws IOException
return delegateFirehose.nextRow();
}

@Override
public Runnable commit()
{
return delegateFirehose.commit();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -202,12 +201,6 @@ public InputRow nextRow()
return transformer.transform(inputRow);
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -85,12 +84,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ public InputRow nextRow()
return row;
}

@Override
public Runnable commit()
{
return firehose.commit();
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -85,12 +84,6 @@ private JsonIterator<Map<String, Object>> getNextLineIterator()
return resultIterator.next();
}

@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}

@Override
public void close() throws IOException
{
Expand Down
Loading