diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java index 64cb3687caef..76c67a61d64b 100644 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java @@ -77,4 +77,14 @@ default boolean isSplittable() { return false; } + + /** + * Some FirehoseFactory implementations require the indexing service's TaskToolbox to function. + * Use this method to provide it to them. The argument should always be a TaskToolbox; the + * method signature uses Object so that FirehoseFactories don't all have to be inside the + * indexing-service module. + */ + default void setTaskToolbox(Object taskToolbox) + { + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6d5a0d8e56ad..a50279ef1fb2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -84,7 +83,6 @@ import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; @@ -419,7 +417,7 @@ public TaskStatus run(final TaskToolbox toolbox) final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - setFirehoseFactoryToolbox(firehoseFactory, toolbox); + firehoseFactory.setTaskToolbox(toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. @@ -489,25 +487,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - // pass toolbox to any IngestSegmentFirehoseFactory - private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox) - { - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - return; - } - - if (firehoseFactory instanceof CombiningFirehoseFactory) { - for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) { - if (delegateFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox); - } else if (delegateFactory instanceof CombiningFirehoseFactory) { - setFirehoseFactoryToolbox(delegateFactory, toolbox); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 18e87d758b22..1ecdc414b8f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -184,11 +183,7 @@ public String getSupervisorTaskId() public TaskStatus run(final TaskToolbox toolbox) throws Exception { final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - } + firehoseFactory.setTaskToolbox(toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8087582ecb55..09ab22fed5ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -120,9 +120,10 @@ public List getMetrics() return metrics; } - public void setTaskToolbox(TaskToolbox taskToolbox) + @Override + public void setTaskToolbox(Object taskToolbox) { - this.taskToolbox = taskToolbox; + this.taskToolbox = (TaskToolbox) taskToolbox; } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 3329419cf337..867bd93a404e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -29,6 +29,8 @@ import com.google.common.io.Files; import com.google.inject.Binder; import com.google.inject.Module; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -81,7 +83,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; -import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; +import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -342,27 +344,33 @@ public DataSegment restore(DataSegment segment) null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME) )) { - final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( - TASK.getDataSource(), - Intervals.ETERNITY, - new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), - dim_names, - metric_names, - INDEX_IO - ); - factory.setTaskToolbox(taskToolboxFactory.build(TASK)); - values.add( - new Object[]{ - StringUtils.format( - "DimNames[%s]MetricNames[%s]ParserDimNames[%s]", - dim_names == null ? "null" : "dims", - metric_names == null ? "null" : "metrics", - parser == ROW_PARSER ? "dims" : "null" - ), - factory, - parser - } - ); + for (Boolean wrapInCombining : Arrays.asList(false, true)) { + final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( + TASK.getDataSource(), + Intervals.ETERNITY, + new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), + dim_names, + metric_names, + INDEX_IO + ); + final FirehoseFactory factory = wrapInCombining + ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory)) + : isfFactory; + factory.setTaskToolbox(taskToolboxFactory.build(TASK)); + values.add( + new Object[]{ + StringUtils.format( + "DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", + dim_names == null ? "null" : "dims", + metric_names == null ? "null" : "metrics", + parser == ROW_PARSER ? "dims" : "null", + wrapInCombining + ), + factory, + parser + } + ); + } } } } @@ -407,7 +415,7 @@ public void configure(Binder binder) public IngestSegmentFirehoseFactoryTest( String testName, - IngestSegmentFirehoseFactory factory, + FirehoseFactory factory, InputRowParser rowParser ) { @@ -436,7 +444,7 @@ public IngestSegmentFirehoseFactoryTest( private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile(); private static final List segmentSet = new ArrayList<>(MAX_SHARD_NUMBER); - private final IngestSegmentFirehoseFactory factory; + private final FirehoseFactory factory; private final InputRowParser rowParser; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( @@ -518,15 +526,20 @@ private static void recursivelyDelete(final File dir) @Test public void sanityTest() { - Assert.assertEquals(TASK.getDataSource(), factory.getDataSource()); - if (factory.getDimensions() != null) { - Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray()); + if (factory instanceof CombiningFirehoseFactory) { + // This method tests IngestSegmentFirehoseFactory-specific methods. + return; + } + final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory; + Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource()); + if (isfFactory.getDimensions() != null) { + Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray()); } - Assert.assertEquals(Intervals.ETERNITY, factory.getInterval()); - if (factory.getMetrics() != null) { + Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval()); + if (isfFactory.getMetrics() != null) { Assert.assertEquals( ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), - ImmutableSet.copyOf(factory.getMetrics()) + ImmutableSet.copyOf(isfFactory.getMetrics()) ); } } @@ -536,15 +549,17 @@ public void simpleFirehoseReadingTest() throws IOException { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(rowParser, null)) { + try (final Firehose firehose = factory.connect(rowParser, null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME)); - Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001); + Assert.assertEquals( + METRIC_FLOAT_VALUE, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); ++rowcount; } } @@ -563,9 +578,8 @@ public void testTransformSpec() throws IOException ) ); int skipped = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(transformSpec.decorate(rowParser), null)) { + try (final Firehose firehose = + factory.connect(transformSpec.decorate(rowParser), null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); if (row == null) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java index 9654d1f11b52..52a839490072 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Predicate; import org.apache.druid.data.input.Firehose; @@ -61,6 +62,13 @@ public Interval getInterval() return interval; } + @Override + @JsonIgnore + public void setTaskToolbox(Object taskToolbox) + { + delegate.setTaskToolbox(taskToolbox); + } + @Override public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 6e059e249820..1c64922810c3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -65,6 +66,15 @@ public List getDelegateFactoryList() return delegateFactoryList; } + @Override + @JsonIgnore + public void setTaskToolbox(Object taskToolbox) + { + for (FirehoseFactory delegateFactory : delegateFactoryList) { + delegateFactory.setTaskToolbox(taskToolbox); + } + } + class CombiningFirehose implements Firehose { private final InputRowParser parser; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java index e8a7dca1a8a0..1426e26ea177 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.Firehose; @@ -61,6 +62,13 @@ public int getCount() return count; } + @Override + @JsonIgnore + public void setTaskToolbox(Object taskToolbox) + { + delegate.setTaskToolbox(taskToolbox); + } + @Override public Firehose connect(final InputRowParser parser, File temporaryDirectory) throws IOException { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index d37265b6b850..f8933a7e0100 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -139,4 +140,11 @@ public DateTime getShutoffTime() { return shutoffTime; } + + @Override + @JsonIgnore + public void setTaskToolbox(Object taskToolbox) + { + delegateFactory.setTaskToolbox(taskToolbox); + } }