From c2b3c1c649b6001086333789ad4370436083f702 Mon Sep 17 00:00:00 2001
From: Jihoon Son
Date: Wed, 16 Oct 2019 17:44:36 -0700
Subject: [PATCH 1/2] Remove commit() method Firehose
---
.../apache/druid/data/input/Committer.java | 3 ++
.../org/apache/druid/data/input/Firehose.java | 38 ++-----------
.../druid/data/input/FirehoseFactory.java | 4 +-
.../input/impl/FileIteratingFirehose.java | 7 ---
.../AppenderatorDriverRealtimeIndexTask.java | 3 +-
.../common/task/RealtimeIndexTask.java | 3 +-
.../overlord/sampler/SamplerCache.java | 7 ---
.../SeekableStreamSamplerSpec.java | 7 ---
.../druid/indexing/common/TestFirehose.java | 7 ---
...penderatorDriverRealtimeIndexTaskTest.java | 7 ---
.../indexing/overlord/TaskLifecycleTest.java | 26 ---------
.../firehose/CombiningFirehoseFactory.java | 6 ---
.../EventReceiverFirehoseFactory.java | 7 ---
.../firehose/FixedCountFirehoseFactory.java | 6 ---
.../firehose/IngestSegmentFirehose.java | 7 ---
.../realtime/firehose/InlineFirehose.java | 7 ---
.../realtime/firehose/PredicateFirehose.java | 6 ---
.../realtime/firehose/SqlFirehose.java | 7 ---
.../firehose/TimedShutoffFirehoseFactory.java | 6 ---
.../segment/realtime/plumber/Committers.java | 53 +++----------------
.../CombiningFirehoseFactoryTest.java | 7 ---
.../realtime/firehose/InlineFirehoseTest.java | 9 ----
.../plumber/RealtimePlumberSchoolTest.java | 32 +++++++----
23 files changed, 43 insertions(+), 222 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/Committer.java b/core/src/main/java/org/apache/druid/data/input/Committer.java
index 0f8e31bd714d..b4410e185e93 100644
--- a/core/src/main/java/org/apache/druid/data/input/Committer.java
+++ b/core/src/main/java/org/apache/druid/data/input/Committer.java
@@ -21,6 +21,8 @@
import org.apache.druid.guice.annotations.ExtensionPoint;
+import javax.annotation.Nullable;
+
/**
* Committer includes a Runnable and a Jackson-serialized metadata object containing the offset
*/
@@ -32,5 +34,6 @@ public interface Committer extends Runnable
* which needs to be serialized and deserialized by Jackson.
* Commit metadata can be a complex type, but we recommend keeping it to List/Map/"Primitive JSON" types
*/
+ @Nullable
Object getMetadata();
}
diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java
index a1af12fb6d7a..3f35a18fba31 100644
--- a/core/src/main/java/org/apache/druid/data/input/Firehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java
@@ -38,10 +38,9 @@
* any) run out.
*
* Concurrency:
- * The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
- * {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
- * called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
- * #commit()}.
+ * The three methods {@link #hasMore()} and {@link #nextRow()} are all called from the same thread.
+ * {@link #close()} might be called concurrently from a thread different from the thread calling {@link #hasMore()}
+ * and {@link #nextRow()}.
*
*/
@ExtensionPoint
@@ -86,36 +85,9 @@ default InputRowPlusRaw nextRowWithRaw() throws IOException
}
/**
- * Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
- * often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
- * but doesn't necessarily have to be.
- *
- * This method is called when the main processing loop starts to persist its current batch of things to process.
- * The returned runnable will be run when the current batch has been successfully persisted, there is usually
- * some time lag between when this method is called and when the runnable is run. The Runnable is also run on
- * a separate thread so its operation should be thread-safe.
- *
- * The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
- * been committed on the writer side of this interface protocol.
- *
- * A simple implementation of this interface might do nothing when run() is called
- * (in which case the same do-nothing instance can be returned every time), or
- * a more complex implementation might clean up temporary resources that are no longer needed
- * because of InputRows delivered by prior calls to {@link #nextRow()}.
- *
- */
- Runnable commit();
-
- /**
- * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
- * #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
+ * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()} and {@link
+ * #nextRow()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* continue to work after close(), but since the ingestion side is closed rows will eventually run out.
- *
- * The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
- * doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
- * run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
- * {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
- * run() on the returned Runnable then can't be called.
*/
@Override
void close() throws IOException;
diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
index e4c5d3893435..287f3d253c2b 100644
--- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
+++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
@@ -42,7 +42,7 @@ public interface FirehoseFactory
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
*
- * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
+ * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
@@ -58,7 +58,7 @@ default Firehose connect(T parser) throws IOException, ParseException
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
*
- * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
+ * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
index 2c2963c8c688..9c167b7bbc62 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
@@ -25,7 +25,6 @@
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -109,12 +108,6 @@ private LineIterator getNextLineIterator() throws IOException
return iterator;
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close() throws IOException
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c48acea51b91..d6850fe30e0f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -272,7 +272,7 @@ public TaskStatus run(final TaskToolbox toolbox)
this.metrics = fireDepartmentForMetrics.getMetrics();
- Supplier committerSupplier = null;
+ final Supplier committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
@@ -351,7 +351,6 @@ public TaskStatus run(final TaskToolbox toolbox)
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
- committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index 0dc3ee536334..5b8731edf581 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -351,7 +351,7 @@ public String getVersion(final Interval interval)
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
- Supplier committerSupplier = null;
+ final Supplier committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
@@ -387,7 +387,6 @@ public String getVersion(final Interval interval)
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
- committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
index 3c3d6640b337..dc7e04d71420 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
@@ -30,7 +30,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -172,12 +171,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index c7303bc55cfa..119e516c1a37 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -39,7 +39,6 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.File;
@@ -168,12 +167,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
index 03cef9679900..f55c854d0e2c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
@@ -30,7 +30,6 @@
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.Runnables;
import java.io.File;
import java.io.InputStream;
@@ -190,12 +189,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index c53828364197..0c873dc578ca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -131,7 +131,6 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
@@ -224,12 +223,6 @@ public InputRow nextRow()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index e8b0222ba650..74c61d563eef 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -290,19 +290,6 @@ public InputRow nextRow()
throw new RuntimeException("HA HA HA");
}
- @Override
- public Runnable commit()
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
-
- }
- };
- }
-
@Override
public void close()
{
@@ -345,19 +332,6 @@ public InputRow nextRow()
return inputRowIterator.next();
}
- @Override
- public Runnable commit()
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
-
- }
- };
- }
-
@Override
public void close()
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java
index 1b7931bd16be..fb640a2e765b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java
@@ -121,12 +121,6 @@ public InputRow nextRow() throws IOException
return rv;
}
- @Override
- public Runnable commit()
- {
- return currentFirehose.commit();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index 6842f8a26bcb..b3f40c74b99a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -49,7 +49,6 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -448,12 +447,6 @@ public InputRow nextRow()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public int getCurrentBufferSize()
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java
index 14b9ed3b0019..72a48b33a45a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java
@@ -83,12 +83,6 @@ public InputRow nextRow() throws IOException
return delegateFirehose.nextRow();
}
- @Override
- public Runnable commit()
- {
- return delegateFirehose.commit();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehose.java
index bcffe5a3c7b2..477a55037f10 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehose.java
@@ -44,7 +44,6 @@
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -202,12 +201,6 @@ public InputRow nextRow()
return transformer.transform(inputRow);
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java
index 188e6050fb44..0c93d25d652e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java
@@ -27,7 +27,6 @@
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.Runnables;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -85,12 +84,6 @@ public InputRowPlusRaw nextRowWithRaw()
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
index aaecc0226387..d6aadf07adc6 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
@@ -81,12 +81,6 @@ public InputRow nextRow()
return row;
}
- @Override
- public Runnable commit()
- {
- return firehose.commit();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehose.java
index b77ee8c049fe..02c1dca387b8 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehose.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehose.java
@@ -27,7 +27,6 @@
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -85,12 +84,6 @@ private JsonIterator