From ba8a758a1a3ca8e1910af87d56a6287e6b10670c Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 26 Feb 2019 23:37:23 -0800 Subject: [PATCH 1/7] Make IngestSegmentFirehoseFactory splittable for parallel ingestion --- docs/content/ingestion/firehose.md | 7 +- .../indexing/common/task/CompactionTask.java | 4 + .../IngestSegmentFirehoseFactory.java | 304 ++++++++++++++---- .../indexing/firehose/WindowedSegment.java | 73 +++++ .../indexing/firehose/WindowedSegmentId.java | 66 ++++ .../IngestSegmentFirehoseFactoryTest.java | 28 +- ...estSegmentFirehoseFactoryTimelineTest.java | 2 + .../tests/indexer/ITParallelIndexTest.java | 14 +- ...ia_parallel_ingest_segment_index_task.json | 73 +++++ .../client/coordinator/CoordinatorClient.java | 33 ++ 10 files changed, 516 insertions(+), 88 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index 322a1f29d2ca..9698425dadd2 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -87,7 +87,8 @@ The below configurations can be optionally used for tuning the firehose performa ### IngestSegmentFirehose This Firehose can be used to read the data from existing druid segments. -It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. +It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. +This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task). A sample ingest firehose spec is shown below - ```json @@ -102,10 +103,12 @@ A sample ingest firehose spec is shown below - |--------|-----------|---------| |type|This should be "ingestSegment".|yes| |dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes| -|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes| +|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over. You must specify exactly one of this and `segments`.|no| +|segments|A list of JSON objects describing specific segments to include. Each object must have a `segmentId` field naming a segment to ingest, and may have an `intervals` field which is a list of ISO-8601 Interval strings telling what intervals on the segment to read. You must specify exactly one of this and `interval`. |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.html)|no| +|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| #### SqlFirehose diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 53c3641916fd..88303304562a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -506,10 +506,14 @@ private static IndexIOConfig createIoConfig( new IngestSegmentFirehoseFactory( dataSchema.getDataSource(), interval, + // FIXME It might be possible to pass in segmentIds instead of interval here (because the caller + // does have a timeline) but I don't know if it would actually improve anything. + null, null, // no filter // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), + null, // FIXME allow maxInputSegmentBytesPerTask to be tweaked? toolbox.getIndexIO(), coordinatorClient, segmentLoaderFactory, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index bae2946bbdc9..4c808b48e843 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -32,12 +32,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; @@ -58,22 +61,29 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; -public class IngestSegmentFirehoseFactory implements FirehoseFactory +public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> { private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); + private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; private final String dataSource; private final Interval interval; + private final List segmentIds; private final DimFilter dimFilter; private final List dimensions; private final List metrics; + private final long maxInputSegmentBytesPerTask; + private List>> splits; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; @@ -83,9 +93,11 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory segmentIds, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, + @JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -93,18 +105,42 @@ public IngestSegmentFirehoseFactory( ) { Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(interval, "interval"); + if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { + throw new IAE("Specify exactly one of 'interval' and 'segments'"); + } this.dataSource = dataSource; this.interval = interval; + this.segmentIds = segmentIds; this.dimFilter = dimFilter; this.dimensions = dimensions; this.metrics = metrics; + this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null + ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK + : maxInputSegmentBytesPerTask; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); } + @Override + public FiniteFirehoseFactory> withSplit(InputSplit> split) + { + return new IngestSegmentFirehoseFactory( + dataSource, + null, + split.get(), + dimFilter, + dimensions, + metrics, + maxInputSegmentBytesPerTask, + indexIO, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + } + @JsonProperty public String getDataSource() { @@ -135,50 +171,32 @@ public List getMetrics() return metrics; } + @JsonProperty + public List getSegments() + { + return segmentIds; + } + @Override public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException { - log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); + log.info( + "Connecting firehose: dataSource[%s], interval[%s], segmentIds[%s]", + dataSource, + interval, + segmentIds + ); try { - // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration - // as TaskActionClient. - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); - List usedSegments; - while (true) { - try { - usedSegments = - coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); - break; - } - catch (Throwable e) { - log.warn(e, "Exception getting database segments"); - final Duration delay = retryPolicy.getAndIncrementRetryDelay(); - if (delay == null) { - throw e; - } else { - final long sleepTime = jitter(delay.getMillis()); - log.info("Will try again in [%s].", new Duration(sleepTime).toString()); - try { - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw new RuntimeException(e2); - } - } - } - } + final List windowedSegments = getWindowedSegments(); final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); Map segmentFileMap = Maps.newLinkedHashMap(); - for (DataSegment segment : usedSegments) { + for (WindowedSegment windowedSegment : windowedSegments) { + final DataSegment segment = windowedSegment.getSegment(); segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); } - final List> timeLineSegments = VersionedIntervalTimeline - .forSegments(usedSegments) - .lookup(interval); - final List dims; if (dimensions != null) { dims = dimensions; @@ -186,31 +204,35 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); } else { dims = getUniqueDimensions( - timeLineSegments, + windowedSegments, inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() ); } - final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + final List metricsList = metrics == null ? getUniqueMetrics(windowedSegments) : metrics; + // FIXME If we have a segment A that goes from 1 to 4, and a segment B from 2 to 3 with a later + // version, then this list of adapters will be: A[1-2], A[3-4], B[2-3]. Before this PR it would + // be A[1-2], B[2-3], A[3-4]. Is this change OK, or should we endeavor to keep things in order? + // Can we just do a sort on the list at the end to fix this? final List adapters = Lists.newArrayList( Iterables.concat( Iterables.transform( - timeLineSegments, - new Function, Iterable>() + windowedSegments, + new Function>() { @Override - public Iterable apply(final TimelineObjectHolder holder) + public Iterable apply(final WindowedSegment windowedSegment) { return Iterables.transform( - holder.getObject(), - new Function, WindowedStorageAdapter>() + windowedSegment.getIntervals(), + new Function() { @Override - public WindowedStorageAdapter apply(final PartitionChunk input) + public WindowedStorageAdapter apply(final Interval interval) { - final DataSegment segment = input.getObject(); + final DataSegment segment = windowedSegment.getSegment(); try { return new WindowedStorageAdapter( new QueryableIndexStorageAdapter( @@ -221,7 +243,7 @@ public WindowedStorageAdapter apply(final PartitionChunk input) ) ) ), - holder.getInterval() + interval ); } catch (IOException e) { @@ -251,9 +273,169 @@ private long jitter(long input) return retval < 0 ? 0 : retval; } + // Returns each segment in the interval, along with the sub-intervals from that segment which contribute + // to the timeline. Segments are sorted by the first instant which contributes to the timeline. + private List getWindowedSegmentsForInterval(Interval interval) + { + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + List usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + log.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + log.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + final List> timeLineSegments = VersionedIntervalTimeline + .forSegments(usedSegments) + .lookup(interval); + + final List windowedSegments = new ArrayList<>(); + final Map windowedSegmentsBySegment = new HashMap<>(); + + for (TimelineObjectHolder timelineHolder : timeLineSegments) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + final DataSegment segment = chunk.getObject(); + if (timelineHolder.getInterval().isEqual(timelineHolder.getTrueInterval())) { + // This is the simple case: the entirety of the segment is used: no overshadowing or clipping + // from the beginning or end of the full interval. We aren't going to see it again. + if (windowedSegmentsBySegment.containsKey(segment)) { + throw new ISE("unclipped segment[%s] seen more than once on timeline", segment); + } + final WindowedSegment windowedSegment = new WindowedSegment(segment, null); + windowedSegments.add(windowedSegment); + windowedSegmentsBySegment.put(segment, windowedSegment); + } else { + // Some part of the segment is overshadowed or clipped. This WindowedSegment will need + // to have explicit intervals. + final WindowedSegment existingWindowedSegment = windowedSegmentsBySegment.get(segment); + if (existingWindowedSegment == null) { + final List intervals = new ArrayList<>(); + intervals.add(timelineHolder.getInterval()); + final WindowedSegment newWindowedSegment = new WindowedSegment(segment, intervals); + windowedSegments.add(newWindowedSegment); + windowedSegmentsBySegment.put(segment, newWindowedSegment); + } else { + if (!existingWindowedSegment.hasExplicitIntervals()) { + throw new ISE("unclipped segment[%s] seen later clipped on timeline", segment); + } + existingWindowedSegment.getIntervals().add(timelineHolder.getInterval()); + } + } + } + } + + return windowedSegments; + } + + private List getWindowedSegmentsForIds(List ids) + { + // FIXME This is doing a series of single HTTP calls for each segment. It could instead do a single + // GET /metadata/datasources/DATASOURCE/segments call to download all DataSegments. I'm erring on the + // side of avoiding single unboundedly-large calls, in favor of more smaller roundtrip --- after all, + // we're going to later fetch each of these segments in series, so the additional coordinator calls + // in series here don't feel that bad. And adding a new "get multiple segments by ID" HTTP call seems + // like overkill. + // + // Also: If the only use of WindowedSegments and the "segments" field on this FirehoseFactory is for internal + // use by parallel ingestion, then it might make sense to get rid of WindowedSegmentId entirely and just put + // WindowedSegments directly into this class's 'segments' parameter. Then we wouldn't even need to do this fetch + // at all. But if we think people might actually appreciate being able to write ingestSegment firehoses specifying + // specific segments by hand, then WindowedSegmentIds still makes sense. Plus would it be safe to trust the + // DataSegment written in a spec isntead of one fetched from the DB? I note that CompactionTask has an undocumented + // "segments" parameter which has similar semantics, though I'm not really sure in what context it is used. + return ids.stream() + .map(wsi -> new WindowedSegment( + coordinatorClient.getDatabaseSegmentDataSourceSegment(dataSource, wsi.getSegmentId()), + wsi.getIntervals() + )) + .collect(Collectors.toList()); + } + + private List getWindowedSegments() + { + if (segmentIds == null) { + return getWindowedSegmentsForInterval(Preconditions.checkNotNull(interval)); + } else { + return getWindowedSegmentsForIds(segmentIds); + } + } + + private void initializeSplitsIfNeeded() + { + if (splits != null) { + return; + } + + List windowedSegments = getWindowedSegments(); + + // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing + // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their + // data can combine with each other anyway. + + List>> newSplits = new ArrayList<>(); + List currentSplit = new ArrayList<>(); + long bytesInCurrentSplit = 0; + for (WindowedSegment windowedSegment : windowedSegments) { + final long segmentBytes = windowedSegment.getSegment().getSize(); + if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { + // This segment won't fit in the current (non-empty) split, so this split is done. + newSplits.add(new InputSplit<>(currentSplit)); + currentSplit = new ArrayList<>(); + bytesInCurrentSplit = 0; + } + if (segmentBytes > maxInputSegmentBytesPerTask) { + // If this segment is itself bigger than our max, just put it in its own split. + Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); + newSplits.add(new InputSplit<>(Collections.singletonList(windowedSegment.toWindowedSegmentId()))); + } else { + currentSplit.add(windowedSegment.toWindowedSegmentId()); + bytesInCurrentSplit += segmentBytes; + } + } + if (!currentSplit.isEmpty()) { + newSplits.add(new InputSplit<>(currentSplit)); + } + + splits = newSplits; + } + + @Override + public Stream>> getSplits() + { + initializeSplitsIfNeeded(); + return splits.stream(); + } + + @Override + public int getNumSplits() + { + initializeSplitsIfNeeded(); + return splits.size(); + } + @VisibleForTesting static List getUniqueDimensions( - List> timelineSegments, + List windowedSegments, @Nullable Set excludeDimensions ) { @@ -264,15 +446,13 @@ static List getUniqueDimensions( // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more // frequently, and thus the performance should be optimized for recent ones rather than old ones. - // timelineSegments are sorted in order of interval + // windowedSegments are sorted in order of their first interval int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String dimension : chunk.getObject().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); - } + for (WindowedSegment windowedSegment : Lists.reverse(windowedSegments)) { + for (String dimension : windowedSegment.getSegment().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); } } } @@ -284,21 +464,19 @@ static List getUniqueDimensions( } @VisibleForTesting - static List getUniqueMetrics(List> timelineSegments) + static List getUniqueMetrics(List windowedSegments) { final BiMap uniqueMetrics = HashBiMap.create(); // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent // segments to olders. - // timelineSegments are sorted in order of interval + // windowedSegments are sorted in order of their first interval int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String metric : chunk.getObject().getMetrics()) { - if (!uniqueMetrics.containsKey(metric)) { - uniqueMetrics.put(metric, index++); - } + for (WindowedSegment windowedSegment : Lists.reverse(windowedSegments)) { + for (String metric : windowedSegment.getSegment().getMetrics()) { + if (!uniqueMetrics.containsKey(metric)) { + uniqueMetrics.put(metric, index++); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java new file mode 100644 index 000000000000..5aa0a48a0ce3 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.firehose; + +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + +/** + * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline. + * If the list of intervals is null, the entire segment contributes to the timeline. + * + * This class is intended for in-memory use; WindowedSegmentId is better for serializing in specs. + */ +public class WindowedSegment +{ + private final DataSegment segment; + @Nullable + private final List intervals; + + public WindowedSegment( + DataSegment segment, + @Nullable List intervals + ) + { + this.segment = segment; + this.intervals = intervals; + // FIXME validate that intervals are sorted, or sort them now? + } + + public DataSegment getSegment() + { + return segment; + } + + public List getIntervals() + { + if (intervals != null) { + return intervals; + } + return Collections.singletonList(segment.getInterval()); + } + + public boolean hasExplicitIntervals() + { + return intervals != null; + } + + public WindowedSegmentId toWindowedSegmentId() + { + return new WindowedSegmentId(segment.getId().toString(), intervals); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java new file mode 100644 index 000000000000..53657d1b9d24 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline. + * If the list of intervals is null, the entire segment contributes to the timeline. + * + * This class is intended for serialization in specs. The class WindowedSegment is similar but contains + * a full DataSegment instead of just an ID. + */ +public class WindowedSegmentId +{ + // This is of the form used by SegmentId. + private final String segmentId; + @Nullable + private final List intervals; + + @JsonCreator + public WindowedSegmentId( + @JsonProperty("segmentId") String segmentId, + @JsonProperty("intervals") @Nullable List intervals + ) + { + this.segmentId = segmentId; + this.intervals = intervals; + // FIXME validate that intervals are sorted, or sort them now? + } + + @JsonProperty + public String getSegmentId() + { + return segmentId; + } + + @JsonProperty + @Nullable + public List getIntervals() + { + return intervals; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 7f44ad6a528e..b62f2a6158d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -77,11 +77,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedShardSpec; -import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.AfterClass; @@ -197,9 +193,11 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( TASK.getDataSource(), Intervals.ETERNITY, + null, new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), dim_names, metric_names, + null, INDEX_IO, cc, slf, @@ -457,13 +455,11 @@ public void testGetUniqueDimensionsAndMetrics() { final int numSegmentsPerPartitionChunk = 5; final int numPartitionChunksPerTimelineObject = 10; - final int numSegments = numSegmentsPerPartitionChunk * numPartitionChunksPerTimelineObject; final Interval interval = Intervals.of("2017-01-01/2017-01-02"); final String version = "1"; - final List> timelineSegments = new ArrayList<>(); + final List windowedSegments = new ArrayList<>(); for (int i = 0; i < numPartitionChunksPerTimelineObject; i++) { - final List> chunks = new ArrayList<>(); for (int j = 0; j < numSegmentsPerPartitionChunk; j++) { final List dims = IntStream.range(i, i + numSegmentsPerPartitionChunk) .mapToObj(suffix -> "dim" + suffix) @@ -482,20 +478,8 @@ public void testGetUniqueDimensionsAndMetrics() 1, 1 ); - - final PartitionChunk partitionChunk = new NumberedPartitionChunk<>( - i, - numPartitionChunksPerTimelineObject, - segment - ); - chunks.add(partitionChunk); + windowedSegments.add(new WindowedSegment(segment, null)); } - final TimelineObjectHolder timelineHolder = new TimelineObjectHolder<>( - interval, - version, - new PartitionHolder<>(chunks) - ); - timelineSegments.add(timelineHolder); } final String[] expectedDims = new String[]{ @@ -532,11 +516,11 @@ public void testGetUniqueDimensionsAndMetrics() }; Assert.assertEquals( Arrays.asList(expectedDims), - IngestSegmentFirehoseFactory.getUniqueDimensions(timelineSegments, null) + IngestSegmentFirehoseFactory.getUniqueDimensions(windowedSegments, null) ); Assert.assertEquals( Arrays.asList(expectedMetrics), - IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments) + IngestSegmentFirehoseFactory.getUniqueMetrics(windowedSegments) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 8ca24d95d727..db9873d3ea61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -290,9 +290,11 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, testCase.interval, + null, new TrueDimFilter(), Arrays.asList(DIMENSIONS), Arrays.asList(METRICS), + null, INDEX_IO, cc, slf, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index e457a1c6d656..42ba81a9350e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -33,11 +33,16 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest private static String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json"; private static String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json"; private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test"; + private static String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test"; + private static String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json"; @Test public void testIndexData() throws Exception { - try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) { + try (final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ingestSegmentCloseable = unloader( + INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()); + ) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, @@ -53,6 +58,13 @@ public void testIndexData() throws Exception REINDEX_QUERIES_RESOURCE, true ); + + doReindexTest( + INDEX_DATASOURCE, + INDEX_INGEST_SEGMENT_DATASOURCE, + INDEX_INGEST_SEGMENT_TASK, + INDEX_QUERIES_RESOURCE + ); } } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json new file mode 100644 index 000000000000..9be622af8cd7 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json @@ -0,0 +1,73 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals": [ + "2013-08-31/2013-09-01" + ] + }, + "parser": { + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + { + "type": "string", + "name": "language", + "createBitmapIndex": false + }, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + } + }, + "ioConfig": { + "type": "index_parallel", + "firehose": { + "type": "ingestSegment", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-01", + "maxInputSegmentBytesPerTask": 1 + } + } + } +} diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 418854059389..6a68fec850be 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -156,4 +156,37 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, throw new RuntimeException(e); } } + + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + try { + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", + StringUtils.urlEncode(dataSource), + StringUtils.urlEncode(segmentId) + ) + ) + ); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching database segment data source segment status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } From 77c6b26353193773873dd964d82114e14d67a918 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 28 Feb 2019 18:28:45 -0800 Subject: [PATCH 2/7] Code review feedback - Get rid of WindowedSegment - Don't document 'segments' parameter or support splitting firehoses that use it - Require 'intervals' in WindowedSegmentId (since it won't be written by hand) --- docs/content/ingestion/firehose.md | 3 +- .../IngestSegmentFirehoseFactory.java | 253 ++++++++++-------- .../indexing/firehose/WindowedSegment.java | 73 ----- .../indexing/firehose/WindowedSegmentId.java | 17 +- .../IngestSegmentFirehoseFactoryTest.java | 26 +- .../client/coordinator/CoordinatorClient.java | 1 - 6 files changed, 167 insertions(+), 206 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index 9698425dadd2..bda4d91c536f 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -103,8 +103,7 @@ A sample ingest firehose spec is shown below - |--------|-----------|---------| |type|This should be "ingestSegment".|yes| |dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes| -|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over. You must specify exactly one of this and `segments`.|no| -|segments|A list of JSON objects describing specific segments to include. Each object must have a `segmentId` field naming a segment to ingest, and may have an `intervals` field which is a list of ISO-8601 Interval strings telling what intervals on the segment to read. You must specify exactly one of this and `interval`. +|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes| |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.html)|no| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 4c808b48e843..1d016037e2d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -41,6 +41,7 @@ import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; @@ -55,6 +56,7 @@ import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Duration; import org.joda.time.Interval; @@ -67,6 +69,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -93,6 +97,8 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory segmentIds, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @@ -188,13 +194,17 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) ); try { - final List windowedSegments = getWindowedSegments(); + final List> timeLineSegments = getTimeline(); final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); Map segmentFileMap = Maps.newLinkedHashMap(); - for (WindowedSegment windowedSegment : windowedSegments) { - final DataSegment segment = windowedSegment.getSegment(); - segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + for (TimelineObjectHolder holder : timeLineSegments) { + for (PartitionChunk chunk : holder.getObject()) { + final DataSegment segment = chunk.getObject(); + if (!segmentFileMap.containsKey(segment)) { + segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + } + } } final List dims; @@ -204,35 +214,31 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); } else { dims = getUniqueDimensions( - windowedSegments, + timeLineSegments, inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() ); } - final List metricsList = metrics == null ? getUniqueMetrics(windowedSegments) : metrics; + final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; - // FIXME If we have a segment A that goes from 1 to 4, and a segment B from 2 to 3 with a later - // version, then this list of adapters will be: A[1-2], A[3-4], B[2-3]. Before this PR it would - // be A[1-2], B[2-3], A[3-4]. Is this change OK, or should we endeavor to keep things in order? - // Can we just do a sort on the list at the end to fix this? final List adapters = Lists.newArrayList( Iterables.concat( Iterables.transform( - windowedSegments, - new Function>() + timeLineSegments, + new Function, Iterable>() { @Override - public Iterable apply(final WindowedSegment windowedSegment) + public Iterable apply(final TimelineObjectHolder holder) { return Iterables.transform( - windowedSegment.getIntervals(), - new Function() + holder.getObject(), + new Function, WindowedStorageAdapter>() { @Override - public WindowedStorageAdapter apply(final Interval interval) + public WindowedStorageAdapter apply(final PartitionChunk input) { - final DataSegment segment = windowedSegment.getSegment(); + final DataSegment segment = input.getObject(); try { return new WindowedStorageAdapter( new QueryableIndexStorageAdapter( @@ -243,7 +249,7 @@ public WindowedStorageAdapter apply(final Interval interval) ) ) ), - interval + holder.getInterval() ); } catch (IOException e) { @@ -273,10 +279,19 @@ private long jitter(long input) return retval < 0 ? 0 : retval; } - // Returns each segment in the interval, along with the sub-intervals from that segment which contribute - // to the timeline. Segments are sorted by the first instant which contributes to the timeline. - private List getWindowedSegmentsForInterval(Interval interval) + private List> getTimeline() { + if (interval == null) { + return getTimelineForSegmentIds(); + } else { + return getTimelineForInterval(); + } + } + + private List> getTimelineForInterval() + { + Preconditions.checkNotNull(interval); + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration // as TaskActionClient. final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); @@ -305,79 +320,55 @@ private List getWindowedSegmentsForInterval(Interval interval) } } - final List> timeLineSegments = VersionedIntervalTimeline - .forSegments(usedSegments) - .lookup(interval); - - final List windowedSegments = new ArrayList<>(); - final Map windowedSegmentsBySegment = new HashMap<>(); + return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); + } - for (TimelineObjectHolder timelineHolder : timeLineSegments) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - final DataSegment segment = chunk.getObject(); - if (timelineHolder.getInterval().isEqual(timelineHolder.getTrueInterval())) { - // This is the simple case: the entirety of the segment is used: no overshadowing or clipping - // from the beginning or end of the full interval. We aren't going to see it again. - if (windowedSegmentsBySegment.containsKey(segment)) { - throw new ISE("unclipped segment[%s] seen more than once on timeline", segment); + private List> getTimelineForSegmentIds() + { + final SortedMap> timeline = new TreeMap<>( + Comparators.intervalsByStartThenEnd() + ); + for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) { + final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( + dataSource, + windowedSegmentId.getSegmentId() + ); + for (Interval interval : windowedSegmentId.getIntervals()) { + final TimelineObjectHolder existingHolder = timeline.get(interval); + if (existingHolder != null) { + if (!existingHolder.getVersion().equals(segment.getVersion())) { + throw new ISE("Timeline segments with the same interval should have the same version: " + + "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); } - final WindowedSegment windowedSegment = new WindowedSegment(segment, null); - windowedSegments.add(windowedSegment); - windowedSegmentsBySegment.put(segment, windowedSegment); + existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); } else { - // Some part of the segment is overshadowed or clipped. This WindowedSegment will need - // to have explicit intervals. - final WindowedSegment existingWindowedSegment = windowedSegmentsBySegment.get(segment); - if (existingWindowedSegment == null) { - final List intervals = new ArrayList<>(); - intervals.add(timelineHolder.getInterval()); - final WindowedSegment newWindowedSegment = new WindowedSegment(segment, intervals); - windowedSegments.add(newWindowedSegment); - windowedSegmentsBySegment.put(segment, newWindowedSegment); - } else { - if (!existingWindowedSegment.hasExplicitIntervals()) { - throw new ISE("unclipped segment[%s] seen later clipped on timeline", segment); - } - existingWindowedSegment.getIntervals().add(timelineHolder.getInterval()); - } + timeline.put(interval, new TimelineObjectHolder<>( + interval, + segment.getInterval(), + segment.getVersion(), + new PartitionHolder(segment.getShardSpec().createChunk(segment)) + )); } } } - return windowedSegments; - } - - private List getWindowedSegmentsForIds(List ids) - { - // FIXME This is doing a series of single HTTP calls for each segment. It could instead do a single - // GET /metadata/datasources/DATASOURCE/segments call to download all DataSegments. I'm erring on the - // side of avoiding single unboundedly-large calls, in favor of more smaller roundtrip --- after all, - // we're going to later fetch each of these segments in series, so the additional coordinator calls - // in series here don't feel that bad. And adding a new "get multiple segments by ID" HTTP call seems - // like overkill. - // - // Also: If the only use of WindowedSegments and the "segments" field on this FirehoseFactory is for internal - // use by parallel ingestion, then it might make sense to get rid of WindowedSegmentId entirely and just put - // WindowedSegments directly into this class's 'segments' parameter. Then we wouldn't even need to do this fetch - // at all. But if we think people might actually appreciate being able to write ingestSegment firehoses specifying - // specific segments by hand, then WindowedSegmentIds still makes sense. Plus would it be safe to trust the - // DataSegment written in a spec isntead of one fetched from the DB? I note that CompactionTask has an undocumented - // "segments" parameter which has similar semantics, though I'm not really sure in what context it is used. - return ids.stream() - .map(wsi -> new WindowedSegment( - coordinatorClient.getDatabaseSegmentDataSourceSegment(dataSource, wsi.getSegmentId()), - wsi.getIntervals() - )) - .collect(Collectors.toList()); - } - - private List getWindowedSegments() - { - if (segmentIds == null) { - return getWindowedSegmentsForInterval(Preconditions.checkNotNull(interval)); - } else { - return getWindowedSegmentsForIds(segmentIds); + // Validate that none of the given windows overlaps (except for when multiple segments share exactly the + // same interval). + Interval lastInterval = null; + for (Interval interval : timeline.keySet()) { + if (lastInterval != null) { + if (interval.overlaps(lastInterval)) { + throw new IAE( + "Distinct intervals in input segments may not overlap: [%s] vs [%s]", + lastInterval, + interval + ); + } + } + lastInterval = interval; } + + return new ArrayList<>(timeline.values()); } private void initializeSplitsIfNeeded() @@ -386,7 +377,8 @@ private void initializeSplitsIfNeeded() return; } - List windowedSegments = getWindowedSegments(); + // isSplittable() ensures this is only called when we have an interval. + final List> timelineSegments = getTimelineForInterval(); // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their @@ -394,22 +386,41 @@ private void initializeSplitsIfNeeded() List>> newSplits = new ArrayList<>(); List currentSplit = new ArrayList<>(); + Map windowedSegmentIds = new HashMap<>(); long bytesInCurrentSplit = 0; - for (WindowedSegment windowedSegment : windowedSegments) { - final long segmentBytes = windowedSegment.getSegment().getSize(); - if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { - // This segment won't fit in the current (non-empty) split, so this split is done. - newSplits.add(new InputSplit<>(currentSplit)); - currentSplit = new ArrayList<>(); - bytesInCurrentSplit = 0; - } - if (segmentBytes > maxInputSegmentBytesPerTask) { - // If this segment is itself bigger than our max, just put it in its own split. - Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); - newSplits.add(new InputSplit<>(Collections.singletonList(windowedSegment.toWindowedSegmentId()))); - } else { - currentSplit.add(windowedSegment.toWindowedSegmentId()); - bytesInCurrentSplit += segmentBytes; + for (TimelineObjectHolder timelineHolder : timelineSegments) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + final DataSegment segment = chunk.getObject(); + final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); + if (existingWindowedSegmentId != null) { + // We've already seen this segment in the timeline, so just add this interval to it. It has already + // been placed into a split. + existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); + } else { + // It's the first time we've seen this segment, so create a new WindowedSegmentId. + List intervals = new ArrayList<>(); + // Use the interval that contributes to the timeline, not the entire segment's true interval. + intervals.add(timelineHolder.getInterval()); + final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); + windowedSegmentIds.put(segment, newWindowedSegmentId); + + // Now figure out if it goes in the current split or not. + final long segmentBytes = segment.getSize(); + if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { + // This segment won't fit in the current non-empty split, so this split is done. + newSplits.add(new InputSplit<>(currentSplit)); + currentSplit = new ArrayList<>(); + bytesInCurrentSplit = 0; + } + if (segmentBytes > maxInputSegmentBytesPerTask) { + // If this segment is itself bigger than our max, just put it in its own split. + Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); + newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); + } else { + currentSplit.add(newWindowedSegmentId); + bytesInCurrentSplit += segmentBytes; + } + } } } if (!currentSplit.isEmpty()) { @@ -419,6 +430,14 @@ private void initializeSplitsIfNeeded() splits = newSplits; } + @Override + public boolean isSplittable() + { + // Specifying 'segments' to this factory instead of 'interval' is intended primarily for internal use by + // parallel batch injection: we don't need to support splitting a list of segments. + return interval != null; + } + @Override public Stream>> getSplits() { @@ -435,7 +454,7 @@ public int getNumSplits() @VisibleForTesting static List getUniqueDimensions( - List windowedSegments, + List> timelineSegments, @Nullable Set excludeDimensions ) { @@ -446,13 +465,15 @@ static List getUniqueDimensions( // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more // frequently, and thus the performance should be optimized for recent ones rather than old ones. - // windowedSegments are sorted in order of their first interval + // timelineSegments are sorted in order of interval int index = 0; - for (WindowedSegment windowedSegment : Lists.reverse(windowedSegments)) { - for (String dimension : windowedSegment.getSegment().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String dimension : chunk.getObject().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); + } } } } @@ -464,19 +485,21 @@ static List getUniqueDimensions( } @VisibleForTesting - static List getUniqueMetrics(List windowedSegments) + static List getUniqueMetrics(List> timelineSegments) { final BiMap uniqueMetrics = HashBiMap.create(); // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent // segments to olders. - // windowedSegments are sorted in order of their first interval + // timelineSegments are sorted in order of interval int index = 0; - for (WindowedSegment windowedSegment : Lists.reverse(windowedSegments)) { - for (String metric : windowedSegment.getSegment().getMetrics()) { - if (!uniqueMetrics.containsKey(metric)) { - uniqueMetrics.put(metric, index++); + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String metric : chunk.getObject().getMetrics()) { + if (!uniqueMetrics.containsKey(metric)) { + uniqueMetrics.put(metric, index++); + } } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java deleted file mode 100644 index 5aa0a48a0ce3..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegment.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.firehose; - -import org.apache.druid.timeline.DataSegment; -import org.joda.time.Interval; - -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.List; - -/** - * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline. - * If the list of intervals is null, the entire segment contributes to the timeline. - * - * This class is intended for in-memory use; WindowedSegmentId is better for serializing in specs. - */ -public class WindowedSegment -{ - private final DataSegment segment; - @Nullable - private final List intervals; - - public WindowedSegment( - DataSegment segment, - @Nullable List intervals - ) - { - this.segment = segment; - this.intervals = intervals; - // FIXME validate that intervals are sorted, or sort them now? - } - - public DataSegment getSegment() - { - return segment; - } - - public List getIntervals() - { - if (intervals != null) { - return intervals; - } - return Collections.singletonList(segment.getInterval()); - } - - public boolean hasExplicitIntervals() - { - return intervals != null; - } - - public WindowedSegmentId toWindowedSegmentId() - { - return new WindowedSegmentId(segment.getId().toString(), intervals); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java index 53657d1b9d24..c3f04bbcb27e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java @@ -21,34 +21,30 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.List; /** * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline. - * If the list of intervals is null, the entire segment contributes to the timeline. - * - * This class is intended for serialization in specs. The class WindowedSegment is similar but contains - * a full DataSegment instead of just an ID. + *

+ * This class is intended for serialization in specs. */ public class WindowedSegmentId { // This is of the form used by SegmentId. private final String segmentId; - @Nullable private final List intervals; @JsonCreator public WindowedSegmentId( @JsonProperty("segmentId") String segmentId, - @JsonProperty("intervals") @Nullable List intervals + @JsonProperty("intervals") List intervals ) { - this.segmentId = segmentId; - this.intervals = intervals; - // FIXME validate that intervals are sorted, or sort them now? + this.segmentId = Preconditions.checkNotNull(segmentId, "null segmentId"); + this.intervals = Preconditions.checkNotNull(intervals, "null intervals"); } @JsonProperty @@ -58,7 +54,6 @@ public String getSegmentId() } @JsonProperty - @Nullable public List getIntervals() { return intervals; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index b62f2a6158d8..70e5544b00eb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -77,7 +77,11 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.AfterClass; @@ -455,11 +459,13 @@ public void testGetUniqueDimensionsAndMetrics() { final int numSegmentsPerPartitionChunk = 5; final int numPartitionChunksPerTimelineObject = 10; + final int numSegments = numSegmentsPerPartitionChunk * numPartitionChunksPerTimelineObject; final Interval interval = Intervals.of("2017-01-01/2017-01-02"); final String version = "1"; - final List windowedSegments = new ArrayList<>(); + final List> timelineSegments = new ArrayList<>(); for (int i = 0; i < numPartitionChunksPerTimelineObject; i++) { + final List> chunks = new ArrayList<>(); for (int j = 0; j < numSegmentsPerPartitionChunk; j++) { final List dims = IntStream.range(i, i + numSegmentsPerPartitionChunk) .mapToObj(suffix -> "dim" + suffix) @@ -478,8 +484,20 @@ public void testGetUniqueDimensionsAndMetrics() 1, 1 ); - windowedSegments.add(new WindowedSegment(segment, null)); + + final PartitionChunk partitionChunk = new NumberedPartitionChunk<>( + i, + numPartitionChunksPerTimelineObject, + segment + ); + chunks.add(partitionChunk); } + final TimelineObjectHolder timelineHolder = new TimelineObjectHolder<>( + interval, + version, + new PartitionHolder<>(chunks) + ); + timelineSegments.add(timelineHolder); } final String[] expectedDims = new String[]{ @@ -516,11 +534,11 @@ public void testGetUniqueDimensionsAndMetrics() }; Assert.assertEquals( Arrays.asList(expectedDims), - IngestSegmentFirehoseFactory.getUniqueDimensions(windowedSegments, null) + IngestSegmentFirehoseFactory.getUniqueDimensions(timelineSegments, null) ); Assert.assertEquals( Arrays.asList(expectedMetrics), - IngestSegmentFirehoseFactory.getUniqueMetrics(windowedSegments) + IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments) ); } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 6a68fec850be..247dee191645 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -188,5 +188,4 @@ public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String throw new RuntimeException(e); } } - } From 602af301ba8f93359e3f07b46253cd69ef5c8f8e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 28 Feb 2019 19:21:56 -0800 Subject: [PATCH 3/7] Add missing @JsonProperty --- .../firehose/IngestSegmentFirehoseFactory.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 1d016037e2d3..ad2bdfe3e24b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -159,6 +159,12 @@ public Interval getInterval() return interval; } + @JsonProperty + public List getSegments() + { + return segmentIds; + } + @JsonProperty("filter") public DimFilter getDimensionsFilter() { @@ -178,9 +184,9 @@ public List getMetrics() } @JsonProperty - public List getSegments() + public long getMaxInputSegmentBytesPerTask() { - return segmentIds; + return maxInputSegmentBytesPerTask; } @Override From 87c65bb14bc08fc458a91219e079e05f7df296af Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 7 Mar 2019 17:27:53 -0800 Subject: [PATCH 4/7] Integration test passes --- .../tests/indexer/ITParallelIndexTest.java | 2 +- ...ia_parallel_ingest_segment_index_task.json | 26 ++++++------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index 42ba81a9350e..b3920a149fec 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -63,7 +63,7 @@ public void testIndexData() throws Exception INDEX_DATASOURCE, INDEX_INGEST_SEGMENT_DATASOURCE, INDEX_INGEST_SEGMENT_TASK, - INDEX_QUERIES_RESOURCE + REINDEX_QUERIES_RESOURCE ); } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json index 9be622af8cd7..535e859586ad 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json @@ -28,7 +28,7 @@ "segmentGranularity": "DAY", "queryGranularity": "second", "intervals": [ - "2013-08-31/2013-09-01" + "2013-08-31/2013-09-02" ] }, "parser": { @@ -38,23 +38,9 @@ "column": "timestamp" }, "dimensionsSpec": { - "dimensions": [ - "page", - { - "type": "string", - "name": "language", - "createBitmapIndex": false - }, - "user", - "unpatrolled", - "newPage", + "dimensionExclusions": [ "robot", - "anonymous", - "namespace", - "continent", - "country", - "region", - "city" + "continent" ] } } @@ -65,9 +51,13 @@ "firehose": { "type": "ingestSegment", "dataSource": "%%DATASOURCE%%", - "interval": "2013-08-31/2013-09-01", + "interval": "2013-08-31/2013-09-02", "maxInputSegmentBytesPerTask": 1 } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 } } } From abe27fba6a57d062c1f1f895d3b191cb4a6e6583 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Fri, 8 Mar 2019 16:48:11 -0800 Subject: [PATCH 5/7] Add unit test --- ...estSegmentFirehoseFactoryTimelineTest.java | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index db9873d3ea61..0add858d54d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -27,8 +27,10 @@ import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -73,6 +75,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class IngestSegmentFirehoseFactoryTimelineTest @@ -102,6 +105,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest private final File tmpDir; private final int expectedCount; private final long expectedSum; + private final int segmentCount; private static final ObjectMapper MAPPER; private static final IndexIO INDEX_IO; @@ -119,17 +123,28 @@ public IngestSegmentFirehoseFactoryTimelineTest( IngestSegmentFirehoseFactory factory, File tmpDir, int expectedCount, - long expectedSum + long expectedSum, + int segmentCount ) { this.factory = factory; this.tmpDir = tmpDir; this.expectedCount = expectedCount; this.expectedSum = expectedSum; + this.segmentCount = segmentCount; } @Test - public void testSimple() throws Exception + public void test() throws Exception + { + // Junit 4.12 doesn't have a good way to run tearDown after multiple tests in a Parameterized + // class run. (Junit 4.13 adds @AfterParam but isn't released yet.) Fake it by just running + // "tests" in series inside one @Test. + testSimple(); + testSplit(); + } + + private void testSimple() throws Exception { int count = 0; long sum = 0; @@ -146,6 +161,36 @@ public void testSimple() throws Exception Assert.assertEquals("sum", expectedSum, sum); } + private void testSplit() throws Exception + { + Assert.assertTrue(factory.isSplittable()); + final int numSplits = factory.getNumSplits(); + // We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte. + Assert.assertEquals(segmentCount, numSplits); + final List>> splits = + factory.getSplits().collect(Collectors.toList()); + Assert.assertEquals(numSplits, splits.size()); + + int count = 0; + long sum = 0; + + for (InputSplit> split : splits) { + final FiniteFirehoseFactory> splitFactory = + factory.withSplit(split); + try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) { + while (firehose.hasMore()) { + final InputRow row = firehose.nextRow(); + count++; + sum += row.getMetric(METRICS[0]).longValue(); + } + } + } + + Assert.assertEquals("count", expectedCount, count); + Assert.assertEquals("sum", expectedSum, sum); + + } + @After public void tearDown() throws Exception { @@ -286,6 +331,16 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, throw new IllegalArgumentException("WTF"); } } + + @Override + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + return testCase.segments + .stream() + .filter(s -> s.getId().toString().equals(segmentId)) + .findAny() + .get(); // throwing if not found is exactly what the real code does + } }; final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, @@ -294,7 +349,8 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, new TrueDimFilter(), Arrays.asList(DIMENSIONS), Arrays.asList(METRICS), - null, + // Split as much as possible + 1L, INDEX_IO, cc, slf, @@ -307,7 +363,8 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, factory, testCase.tmpDir, testCase.expectedCount, - testCase.expectedSum + testCase.expectedSum, + testCase.segments.size() } ); } @@ -387,7 +444,7 @@ public DataSegment make(File tmpDir) Arrays.asList(METRICS), new LinearShardSpec(partitionNum), -1, - 0L + 2L ); } } From 5842d5088877db79406014328bb73177021c3e1b Mon Sep 17 00:00:00 2001 From: David Glasser Date: Fri, 8 Mar 2019 16:51:07 -0800 Subject: [PATCH 6/7] Remove two FIXME comments from CompactionTask I'd like to leave this PR in a potentially mergeable state, but I still would appreciate reviewer eyes on the questions I'm removing here. --- .../org/apache/druid/indexing/common/task/CompactionTask.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 88303304562a..7608f883dae8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -506,14 +506,12 @@ private static IndexIOConfig createIoConfig( new IngestSegmentFirehoseFactory( dataSchema.getDataSource(), interval, - // FIXME It might be possible to pass in segmentIds instead of interval here (because the caller - // does have a timeline) but I don't know if it would actually improve anything. null, null, // no filter // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), - null, // FIXME allow maxInputSegmentBytesPerTask to be tweaked? + null, toolbox.getIndexIO(), coordinatorClient, segmentLoaderFactory, From e7de623e845411f6974cffda48f1aca21cb7a37e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Fri, 22 Mar 2019 15:23:25 -0700 Subject: [PATCH 7/7] Updates from code review --- docs/content/ingestion/native_tasks.md | 2 +- .../firehose/IngestSegmentFirehoseFactory.java | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 4ecaccf53ac3..3f2e1efe2ab4 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -45,7 +45,7 @@ task statuses. If one of them fails, it retries the failed task until the retryi If all worker tasks succeed, then it collects the reported list of generated segments and publishes those segments at once. To use this task, the `firehose` in `ioConfig` should be _splittable_. If it's not, this task runs sequentially. The -current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`HttpFirehose`](./firehose.html#httpfirehose) +current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`IngestSegmentFirehose`](./firehose.html#ingestsegmentfirehose), [`HttpFirehose`](./firehose.html#httpfirehose) , [`StaticS3Firehose`](../development/extensions-core/s3.html#statics3firehose), [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.html#staticazureblobstorefirehose) , [`StaticGoogleBlobStoreFirehose`](../development/extensions-contrib/google.html#staticgoogleblobstorefirehose), and [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.html#staticcloudfilesfirehose). diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index ad2bdfe3e24b..7ed791835fe9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -81,25 +81,31 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory segmentIds; private final DimFilter dimFilter; private final List dimensions; private final List metrics; private final long maxInputSegmentBytesPerTask; - private List>> splits; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final RetryPolicyFactory retryPolicyFactory; + private List>> splits; + @JsonCreator public IngestSegmentFirehoseFactory( @JsonProperty("dataSource") final String dataSource, - @JsonProperty("interval") Interval interval, + @Nullable @JsonProperty("interval") Interval interval, // Specifying "segments" is intended only for when this FirehoseFactory has split itself, // not for direct end user use. - @JsonProperty("segments") List segmentIds, + @Nullable @JsonProperty("segments") List segmentIds, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, @@ -202,6 +208,10 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) try { final List> timeLineSegments = getTimeline(); + // Download all segments locally. + // Note: this requires enough local storage space to fit all of the segments, even though + // IngestSegmentFirehose iterates over the segments in series. We may want to change this + // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); Map segmentFileMap = Maps.newLinkedHashMap(); for (TimelineObjectHolder holder : timeLineSegments) {