From bf2652b8960c34f852fcd89ab3830d4a3ca82a46 Mon Sep 17 00:00:00 2001 From: Zhihui Jiao Date: Fri, 17 Mar 2017 01:02:13 +0800 Subject: [PATCH] Fix IngestSegmentFirehoseFactory --- .../druid/indexing/common/task/IndexTask.java | 19 ++++++++++++++-- .../IngestSegmentFirehoseFactory.java | 22 +++++++++++++------ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 48d0ea3df76d..75a86d8b84bb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -55,6 +55,7 @@ import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; import io.druid.java.util.common.ISE; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Comparators; @@ -168,6 +169,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception .isPresent(); final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); + + if (delegateFirehoseFactory instanceof IngestSegmentFirehoseFactory) { + // pass toolbox to Firehose + ((IngestSegmentFirehoseFactory) delegateFirehoseFactory).setTaskToolbox(toolbox); + } + final FirehoseFactory firehoseFactory; if (ingestionSchema.getIOConfig().isSkipFirehoseCaching() || delegateFirehoseFactory instanceof ReplayableFirehoseFactory) { @@ -290,7 +297,10 @@ private Map> determineShardSpecs( hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); } - List groupKey = Rows.toGroupKey(queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), inputRow); + List groupKey = Rows.toGroupKey( + queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), + inputRow + ); hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); } } @@ -385,7 +395,12 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin try ( final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema); - final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator, fireDepartmentMetrics); + final FiniteAppenderatorDriver driver = newDriver( + appenderator, + toolbox, + segmentAllocator, + fireDepartmentMetrics + ); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser()) ) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 351fa1b9b655..8897e47af3b9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -68,6 +68,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory metrics; private final Injector injector; private final IndexIO indexIO; + private TaskToolbox taskToolbox; @JsonCreator public IngestSegmentFirehoseFactory( @@ -121,21 +122,28 @@ public List getMetrics() return metrics; } + public void setTaskToolbox(TaskToolbox taskToolbox) + { + this.taskToolbox = taskToolbox; + } + @Override public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException { log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); - // better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method. - // Noop Task is just used to create the toolbox and list segments. - final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( - new NoopTask("reingest", 0, 0, null, null, null) - ); + + if (taskToolbox == null) { + // Noop Task is just used to create the toolbox and list segments. + taskToolbox = injector.getInstance(TaskToolboxFactory.class).build( + new NoopTask("reingest", 0, 0, null, null, null) + ); + } try { - final List usedSegments = toolbox + final List usedSegments = taskToolbox .getTaskActionClient() .submit(new SegmentListUsedAction(dataSource, interval, null)); - final Map segmentFileMap = toolbox.fetchSegments(usedSegments); + final Map segmentFileMap = taskToolbox.fetchSegments(usedSegments); VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( Ordering.natural().nullsFirst() );