From 208f2b75122558aa3a8a152f1221e6a5bf9604e0 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 12 Feb 2019 16:20:07 -0800 Subject: [PATCH 1/2] ParallelIndexSubTask: support ingestSegment in delegating factories IndexTask had special-cased code to properly send a TaskToolbox to a IngestSegmentFirehoseFactory that's nested inside a CombiningFirehoseFactory, but ParallelIndexSubTask didn't. This commit generalizes the concept to an optional setContext method on FirehoseFactory that CombiningFirehoseFactory and IngestSegmentFirehoseFactory implement. Also pass the context through for ClippedFirehoseFactory, FixedCountFirehoseFactory, and TimedShutoffFirehoseFactory --- ie, allow IngestSegmentFirehoseFactory to be used from within these wrappers inside both IndexTask and ParallelIndexSubTask. --- .../druid/data/input/FirehoseFactory.java | 10 +++ .../druid/indexing/common/task/IndexTask.java | 22 +---- .../batch/parallel/ParallelIndexSubTask.java | 6 +- .../IngestSegmentFirehoseFactory.java | 11 +++ .../IngestSegmentFirehoseFactoryTest.java | 88 +++++++++++-------- ...estSegmentFirehoseFactoryTimelineTest.java | 5 +- .../firehose/ClippedFirehoseFactory.java | 8 ++ .../firehose/CombiningFirehoseFactory.java | 10 +++ .../firehose/FixedCountFirehoseFactory.java | 8 ++ .../firehose/TimedShutoffFirehoseFactory.java | 8 ++ 10 files changed, 112 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java index 64cb3687caef..092e520e824a 100644 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java @@ -77,4 +77,14 @@ default boolean isSplittable() { return false; } + + /** + * Some FirehoseFactory implementations require additional context from the task that uses them. + * For example, IngestSegmentFirehoseFactory needs a TaskToolbox from the indexing-service task. + * This method is also implemented by implementations such as CombiningFirehoseFactory which wrap + * other implementations. + */ + default void setContext(String key, Object value) + { + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6d5a0d8e56ad..9ec407d2c73a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -84,7 +84,6 @@ import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; @@ -419,7 +418,7 @@ public TaskStatus run(final TaskToolbox toolbox) final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - setFirehoseFactoryToolbox(firehoseFactory, toolbox); + firehoseFactory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. @@ -489,25 +488,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - // pass toolbox to any IngestSegmentFirehoseFactory - private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox) - { - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - return; - } - - if (firehoseFactory instanceof CombiningFirehoseFactory) { - for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) { - if (delegateFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox); - } else if (delegateFactory instanceof CombiningFirehoseFactory) { - setFirehoseFactoryToolbox(delegateFactory, toolbox); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 18e87d758b22..0be033725281 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -184,11 +184,7 @@ public String getSupervisorTaskId() public TaskStatus run(final TaskToolbox toolbox) throws Exception { final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - } + firehoseFactory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8087582ecb55..bbf8ddc244d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -61,6 +61,8 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory { + public static final String CONTEXT_TASK_TOOLBOX = "TaskToolbox"; + private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); private final String dataSource; private final Interval interval; @@ -120,6 +122,15 @@ public List getMetrics() return metrics; } + @Override + public void setContext(String key, Object value) + { + if (CONTEXT_TASK_TOOLBOX.equals(key)) { + this.taskToolbox = (TaskToolbox) value; + } + } + + @Deprecated public void setTaskToolbox(TaskToolbox taskToolbox) { this.taskToolbox = taskToolbox; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 3329419cf337..0e7aed9614b5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -29,6 +29,8 @@ import com.google.common.io.Files; import com.google.inject.Binder; import com.google.inject.Module; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -81,7 +83,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; -import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; +import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -342,27 +344,33 @@ public DataSegment restore(DataSegment segment) null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME) )) { - final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( - TASK.getDataSource(), - Intervals.ETERNITY, - new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), - dim_names, - metric_names, - INDEX_IO - ); - factory.setTaskToolbox(taskToolboxFactory.build(TASK)); - values.add( - new Object[]{ - StringUtils.format( - "DimNames[%s]MetricNames[%s]ParserDimNames[%s]", - dim_names == null ? "null" : "dims", - metric_names == null ? "null" : "metrics", - parser == ROW_PARSER ? "dims" : "null" - ), - factory, - parser - } - ); + for (Boolean wrapInCombining : Arrays.asList(false, true)) { + final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( + TASK.getDataSource(), + Intervals.ETERNITY, + new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), + dim_names, + metric_names, + INDEX_IO + ); + final FirehoseFactory factory = wrapInCombining + ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory)) + : isfFactory; + factory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, taskToolboxFactory.build(TASK)); + values.add( + new Object[]{ + StringUtils.format( + "DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", + dim_names == null ? "null" : "dims", + metric_names == null ? "null" : "metrics", + parser == ROW_PARSER ? "dims" : "null", + wrapInCombining + ), + factory, + parser + } + ); + } } } } @@ -407,7 +415,7 @@ public void configure(Binder binder) public IngestSegmentFirehoseFactoryTest( String testName, - IngestSegmentFirehoseFactory factory, + FirehoseFactory factory, InputRowParser rowParser ) { @@ -436,7 +444,7 @@ public IngestSegmentFirehoseFactoryTest( private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile(); private static final List segmentSet = new ArrayList<>(MAX_SHARD_NUMBER); - private final IngestSegmentFirehoseFactory factory; + private final FirehoseFactory factory; private final InputRowParser rowParser; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( @@ -518,15 +526,20 @@ private static void recursivelyDelete(final File dir) @Test public void sanityTest() { - Assert.assertEquals(TASK.getDataSource(), factory.getDataSource()); - if (factory.getDimensions() != null) { - Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray()); + if (factory instanceof CombiningFirehoseFactory) { + // This method tests IngestSegmentFirehoseFactory-specific methods. + return; + } + final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory; + Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource()); + if (isfFactory.getDimensions() != null) { + Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray()); } - Assert.assertEquals(Intervals.ETERNITY, factory.getInterval()); - if (factory.getMetrics() != null) { + Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval()); + if (isfFactory.getMetrics() != null) { Assert.assertEquals( ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), - ImmutableSet.copyOf(factory.getMetrics()) + ImmutableSet.copyOf(isfFactory.getMetrics()) ); } } @@ -536,15 +549,17 @@ public void simpleFirehoseReadingTest() throws IOException { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(rowParser, null)) { + try (final Firehose firehose = factory.connect(rowParser, null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME)); - Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001); + Assert.assertEquals( + METRIC_FLOAT_VALUE, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); ++rowcount; } } @@ -563,9 +578,8 @@ public void testTransformSpec() throws IOException ) ); int skipped = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(transformSpec.decorate(rowParser), null)) { + try (final Firehose firehose = + factory.connect(transformSpec.decorate(rowParser), null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); if (row == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 70e877bad45b..a13f146117dd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -357,7 +357,10 @@ public TaskActionClient create(Task task) Arrays.asList(METRICS), INDEX_IO ); - factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE))); + factory.setContext( + IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, + taskToolboxFactory.build(NoopTask.create(DATA_SOURCE)) + ); constructors.add( new Object[]{ diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java index 9654d1f11b52..fedc52846abc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Predicate; import org.apache.druid.data.input.Firehose; @@ -61,6 +62,13 @@ public Interval getInterval() return interval; } + @Override + @JsonIgnore + public void setContext(String key, Object value) + { + delegate.setContext(key, value); + } + @Override public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 6e059e249820..26c397fc0c2d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -65,6 +66,15 @@ public List getDelegateFactoryList() return delegateFactoryList; } + @Override + @JsonIgnore + public void setContext(String key, Object value) + { + for (FirehoseFactory delegateFactory : delegateFactoryList) { + delegateFactory.setContext(key, value); + } + } + class CombiningFirehose implements Firehose { private final InputRowParser parser; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java index e8a7dca1a8a0..d982c47aff63 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.Firehose; @@ -61,6 +62,13 @@ public int getCount() return count; } + @Override + @JsonIgnore + public void setContext(String key, Object value) + { + delegate.setContext(key, value); + } + @Override public Firehose connect(final InputRowParser parser, File temporaryDirectory) throws IOException { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index d37265b6b850..b7b2a8e48826 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -139,4 +140,11 @@ public DateTime getShutoffTime() { return shutoffTime; } + + @Override + @JsonIgnore + public void setContext(String key, Object value) + { + delegateFactory.setContext(key, value); + } } From a9a426f51f33fc8494f283d00620a4425659c80e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Wed, 13 Feb 2019 21:45:26 -0800 Subject: [PATCH 2/2] De-generalize to setTaskToolbox --- .../apache/druid/data/input/FirehoseFactory.java | 10 +++++----- .../druid/indexing/common/task/IndexTask.java | 3 +-- .../task/batch/parallel/ParallelIndexSubTask.java | 3 +-- .../firehose/IngestSegmentFirehoseFactory.java | 14 ++------------ .../firehose/IngestSegmentFirehoseFactoryTest.java | 2 +- .../IngestSegmentFirehoseFactoryTimelineTest.java | 5 +---- .../realtime/firehose/ClippedFirehoseFactory.java | 4 ++-- .../firehose/CombiningFirehoseFactory.java | 4 ++-- .../firehose/FixedCountFirehoseFactory.java | 4 ++-- .../firehose/TimedShutoffFirehoseFactory.java | 4 ++-- 10 files changed, 19 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java index 092e520e824a..76c67a61d64b 100644 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java @@ -79,12 +79,12 @@ default boolean isSplittable() } /** - * Some FirehoseFactory implementations require additional context from the task that uses them. - * For example, IngestSegmentFirehoseFactory needs a TaskToolbox from the indexing-service task. - * This method is also implemented by implementations such as CombiningFirehoseFactory which wrap - * other implementations. + * Some FirehoseFactory implementations require the indexing service's TaskToolbox to function. + * Use this method to provide it to them. The argument should always be a TaskToolbox; the + * method signature uses Object so that FirehoseFactories don't all have to be inside the + * indexing-service module. */ - default void setContext(String key, Object value) + default void setTaskToolbox(Object taskToolbox) { } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 9ec407d2c73a..a50279ef1fb2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -418,7 +417,7 @@ public TaskStatus run(final TaskToolbox toolbox) final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - firehoseFactory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, toolbox); + firehoseFactory.setTaskToolbox(toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 0be033725281..1ecdc414b8f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -184,7 +183,7 @@ public String getSupervisorTaskId() public TaskStatus run(final TaskToolbox toolbox) throws Exception { final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - firehoseFactory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, toolbox); + firehoseFactory.setTaskToolbox(toolbox); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index bbf8ddc244d2..09ab22fed5ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -61,8 +61,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory { - public static final String CONTEXT_TASK_TOOLBOX = "TaskToolbox"; - private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); private final String dataSource; private final Interval interval; @@ -123,17 +121,9 @@ public List getMetrics() } @Override - public void setContext(String key, Object value) - { - if (CONTEXT_TASK_TOOLBOX.equals(key)) { - this.taskToolbox = (TaskToolbox) value; - } - } - - @Deprecated - public void setTaskToolbox(TaskToolbox taskToolbox) + public void setTaskToolbox(Object taskToolbox) { - this.taskToolbox = taskToolbox; + this.taskToolbox = (TaskToolbox) taskToolbox; } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 0e7aed9614b5..867bd93a404e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -356,7 +356,7 @@ public DataSegment restore(DataSegment segment) final FirehoseFactory factory = wrapInCombining ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory)) : isfFactory; - factory.setContext(IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, taskToolboxFactory.build(TASK)); + factory.setTaskToolbox(taskToolboxFactory.build(TASK)); values.add( new Object[]{ StringUtils.format( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index a13f146117dd..70e877bad45b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -357,10 +357,7 @@ public TaskActionClient create(Task task) Arrays.asList(METRICS), INDEX_IO ); - factory.setContext( - IngestSegmentFirehoseFactory.CONTEXT_TASK_TOOLBOX, - taskToolboxFactory.build(NoopTask.create(DATA_SOURCE)) - ); + factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE))); constructors.add( new Object[]{ diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java index fedc52846abc..52a839490072 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java @@ -64,9 +64,9 @@ public Interval getInterval() @Override @JsonIgnore - public void setContext(String key, Object value) + public void setTaskToolbox(Object taskToolbox) { - delegate.setContext(key, value); + delegate.setTaskToolbox(taskToolbox); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 26c397fc0c2d..1c64922810c3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -68,10 +68,10 @@ public List getDelegateFactoryList() @Override @JsonIgnore - public void setContext(String key, Object value) + public void setTaskToolbox(Object taskToolbox) { for (FirehoseFactory delegateFactory : delegateFactoryList) { - delegateFactory.setContext(key, value); + delegateFactory.setTaskToolbox(taskToolbox); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java index d982c47aff63..1426e26ea177 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java @@ -64,9 +64,9 @@ public int getCount() @Override @JsonIgnore - public void setContext(String key, Object value) + public void setTaskToolbox(Object taskToolbox) { - delegate.setContext(key, value); + delegate.setTaskToolbox(taskToolbox); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index b7b2a8e48826..f8933a7e0100 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -143,8 +143,8 @@ public DateTime getShutoffTime() @Override @JsonIgnore - public void setContext(String key, Object value) + public void setTaskToolbox(Object taskToolbox) { - delegateFactory.setContext(key, value); + delegateFactory.setTaskToolbox(taskToolbox); } }