diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 03d298f5e7b8..8deafeeea71d 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -890,3 +890,57 @@ This Firehose can be used to combine and merge data from a list of different Fir |--------|-----------|---------| |type|This should be "combining"|yes| |delegates|List of Firehoses to combine data from|yes| + + +## Input Sources + +### DruidInputSource + +This InputSource can be used to read data from existing Druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment. +This InputSource is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task). +This InputSource has a fixed InputFormat for reading from Druid segments; no InputFormat needs to be specified in the ingestion spec when using this InputSource. + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "druid".|yes| +|dataSource|A String defining the Druid datasource to fetch rows from|yes| +|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes| +|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no| +|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no| +|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no| + +A minimal example DruidInputSource spec is shown below: + +```json +{ + "type": "druid", + "dataSource": "wikipedia", + "interval": "2013-01-01/2013-01-02" +} +``` + +The spec above will read all existing dimension and metric columns from the `wikipedia` datasource, including all rows with a timestamp (the `__time` column) within the interval `2013-01-01/2013-01-02`. + +A spec that applies a filter and reads a subset of the original datasource's columns is shown below. + +```json +{ + "type": "druid", + "dataSource": "wikipedia", + "interval": "2013-01-01/2013-01-02", + "dimensions": [ + "page", + "user" + ], + "metrics": [ + "added" + ], + "filter": { + "type": "selector", + "dimension": "page", + "value": "Druid" + } +} +``` + +This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned. \ No newline at end of file diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java new file mode 100644 index 000000000000..56159e1c1000 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class IndexingServiceInputSourceModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("IndexingServiceInputSourceModule") + .registerSubtypes( + new NamedType(DruidInputSource.class, "druid") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java new file mode 100644 index 000000000000..1f4820f39427 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.partition.PartitionChunk; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ReingestionTimelineUtils +{ + /** + * @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup(). + * @param excludeDimensions Dimensions to be excluded + * @return A list of all the unique dimension column names present in the segments within timelineSegments + */ + public static List getUniqueDimensions( + List> timelineSegments, + @Nullable Set excludeDimensions + ) + { + final BiMap uniqueDims = HashBiMap.create(); + + // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be + // optimized for performance. + // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more + // frequently, and thus the performance should be optimized for recent ones rather than old ones. + + // timelineSegments are sorted in order of interval + int index = 0; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String dimension : chunk.getObject().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); + } + } + } + } + + final BiMap orderedDims = uniqueDims.inverse(); + return IntStream.range(0, orderedDims.size()) + .mapToObj(orderedDims::get) + .collect(Collectors.toList()); + } + + /** + * @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup(). + * @return A list of all the unique metric column names present in the segments within timelineSegments + */ + public static List getUniqueMetrics(List> timelineSegments) + { + final BiMap uniqueMetrics = HashBiMap.create(); + + // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent + // segments to olders. + + // timelineSegments are sorted in order of interval + int[] index = {0}; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String metric : chunk.getObject().getMetrics()) { + uniqueMetrics.computeIfAbsent( + metric, + k -> { + return index[0]++; + } + ); + } + } + } + + final BiMap orderedMetrics = uniqueMetrics.inverse(); + return IntStream.range(0, orderedMetrics.size()) + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 16bedc3cac1e..e85e57cd5354 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -37,11 +37,9 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema; -import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; @@ -57,7 +55,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -68,7 +66,6 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.DimensionHandler; @@ -529,20 +526,20 @@ private static ParallelIndexIOConfig createIoConfig( ) { return new ParallelIndexIOConfig( - new IngestSegmentFirehoseFactory( + null, + new DruidInputSource( dataSchema.getDataSource(), interval, null, - null, // no filter - // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose - dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), - Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), null, + dataSchema.getDimensionsSpec().getDimensionNames(), + Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), toolbox.getIndexIO(), coordinatorClient, segmentLoaderFactory, retryPolicyFactory ), + null, false ); } @@ -603,15 +600,14 @@ private static DataSchema createDataSchema( final AggregatorFactory[] finalMetricsSpec = metricsSpec == null ? createMetricsSpec(queryableIndexAndSegments) : convertToCombiningFactories(metricsSpec); - final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec)); return new DataSchema( dataSource, - jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), + new TimestampSpec(null, null, null), + finalDimensionsSpec, finalMetricsSpec, granularitySpec, - null, - jsonMapper + null ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 094a713cde16..741b463d8581 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -729,7 +729,7 @@ private Map> collectIntervalsAndShardSp ingestionSchema.getDataSchema().getDimensionsSpec(), metricsNames ), - getInputFormat(ingestionSchema), + inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null, tmpDir ) ); @@ -919,7 +919,7 @@ private TaskStatus generateAndPublishSegments( driver, partitionsSpec, inputSource, - getInputFormat(ingestionSchema), + inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null, tmpDir, segmentAllocator ); @@ -1037,15 +1037,17 @@ public IndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); } + if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("parser", dataSchema.getParserMap()), + new Property<>("inputFormat", ioConfig.getInputFormat()) + ) + ); + } this.dataSchema = dataSchema; this.ioConfig = ioConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java index 088503e0536a..f20cf50bed3b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java @@ -94,7 +94,7 @@ public SegmentsAndMetadata process( BatchAppenderatorDriver driver, PartitionsSpec partitionsSpec, InputSource inputSource, - InputFormat inputFormat, + @Nullable InputFormat inputFormat, File tmpDir, IndexTaskSegmentAllocator segmentAllocator ) throws IOException, InterruptedException, ExecutionException, TimeoutException diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java index 9c448992eccf..06baae4b7dd3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java @@ -44,17 +44,19 @@ public ParallelIndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { if (!(ioConfig.getInputSource() instanceof FirehoseFactoryToInputSourceAdaptor)) { throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); } } + if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("parser", dataSchema.getParserMap()), + new Property<>("inputFormat", ioConfig.getInputFormat()) + ) + ); + } this.dataSchema = dataSchema; this.ioConfig = ioConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 4dabf7ea02c5..642936109660 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -193,7 +193,7 @@ private List generateSegments( driver, partitionsSpec, inputSource, - ParallelIndexSupervisorTask.getInputFormat(ingestionSchema), + inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null, tmpDir, segmentAllocator ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 8ea6fc916118..8bf40136eafe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -431,7 +431,7 @@ private Set generateAndPushSegments( ingestionSchema.getDataSchema().getDimensionsSpec(), metricsNames ), - ParallelIndexSupervisorTask.getInputFormat(ingestionSchema), + inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null, tmpDir ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index dd4911a33704..5ae5d2b5919e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -22,11 +22,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -37,12 +34,11 @@ import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; @@ -55,27 +51,14 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> @@ -237,13 +220,15 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); } else { - dims = getUniqueDimensions( + dims = ReingestionTimelineUtils.getUniqueDimensions( timeLineSegments, inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() ); } - final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + final List metricsList = metrics == null + ? ReingestionTimelineUtils.getUniqueMetrics(timeLineSegments) + : metrics; final List adapters = Lists.newArrayList( Iterables.concat( @@ -290,103 +275,13 @@ public WindowedStorageAdapter apply(final PartitionChunk input) return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } - private long jitter(long input) - { - final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; - long retval = input + (long) jitter; - return retval < 0 ? 0 : retval; - } - private List> getTimeline() { if (interval == null) { - return getTimelineForSegmentIds(); + return DruidInputSource.getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(); - } - } - - private List> getTimelineForInterval() - { - Preconditions.checkNotNull(interval); - - // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration - // as TaskActionClient. - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); - Collection usedSegments; - while (true) { - try { - usedSegments = - coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); - break; - } - catch (Throwable e) { - log.warn(e, "Exception getting database segments"); - final Duration delay = retryPolicy.getAndIncrementRetryDelay(); - if (delay == null) { - throw e; - } else { - final long sleepTime = jitter(delay.getMillis()); - log.info("Will try again in [%s].", new Duration(sleepTime).toString()); - try { - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw new RuntimeException(e2); - } - } - } - } - - return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); - } - - private List> getTimelineForSegmentIds() - { - final SortedMap> timeline = new TreeMap<>( - Comparators.intervalsByStartThenEnd() - ); - for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) { - final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( - dataSource, - windowedSegmentId.getSegmentId() - ); - for (Interval interval : windowedSegmentId.getIntervals()) { - final TimelineObjectHolder existingHolder = timeline.get(interval); - if (existingHolder != null) { - if (!existingHolder.getVersion().equals(segment.getVersion())) { - throw new ISE("Timeline segments with the same interval should have the same version: " + - "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); - } - existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); - } else { - timeline.put(interval, new TimelineObjectHolder<>( - interval, - segment.getInterval(), - segment.getVersion(), - new PartitionHolder(segment.getShardSpec().createChunk(segment)) - )); - } - } - } - - // Validate that none of the given windows overlaps (except for when multiple segments share exactly the - // same interval). - Interval lastInterval = null; - for (Interval interval : timeline.keySet()) { - if (lastInterval != null) { - if (interval.overlaps(lastInterval)) { - throw new IAE( - "Distinct intervals in input segments may not overlap: [%s] vs [%s]", - lastInterval, - interval - ); - } - } - lastInterval = interval; + return DruidInputSource.getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); } - - return new ArrayList<>(timeline.values()); } private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) @@ -395,71 +290,13 @@ private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) return; } - final SegmentsSplitHintSpec nonNullSplitHintSpec; - if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { - if (splitHintSpec != null) { - log.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); - } - nonNullSplitHintSpec = new SegmentsSplitHintSpec(null); - } else { - nonNullSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec; - } - - final long maxInputSegmentBytesPerTask = this.maxInputSegmentBytesPerTask == null - ? nonNullSplitHintSpec.getMaxInputSegmentBytesPerTask() - : this.maxInputSegmentBytesPerTask; - - // isSplittable() ensures this is only called when we have an interval. - final List> timelineSegments = getTimelineForInterval(); - - // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing - // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their - // data can combine with each other anyway. - - List>> newSplits = new ArrayList<>(); - List currentSplit = new ArrayList<>(); - Map windowedSegmentIds = new HashMap<>(); - long bytesInCurrentSplit = 0; - for (TimelineObjectHolder timelineHolder : timelineSegments) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - final DataSegment segment = chunk.getObject(); - final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); - if (existingWindowedSegmentId != null) { - // We've already seen this segment in the timeline, so just add this interval to it. It has already - // been placed into a split. - existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); - } else { - // It's the first time we've seen this segment, so create a new WindowedSegmentId. - List intervals = new ArrayList<>(); - // Use the interval that contributes to the timeline, not the entire segment's true interval. - intervals.add(timelineHolder.getInterval()); - final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); - windowedSegmentIds.put(segment, newWindowedSegmentId); - - // Now figure out if it goes in the current split or not. - final long segmentBytes = segment.getSize(); - if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { - // This segment won't fit in the current non-empty split, so this split is done. - newSplits.add(new InputSplit<>(currentSplit)); - currentSplit = new ArrayList<>(); - bytesInCurrentSplit = 0; - } - if (segmentBytes > maxInputSegmentBytesPerTask) { - // If this segment is itself bigger than our max, just put it in its own split. - Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); - newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); - } else { - currentSplit.add(newWindowedSegmentId); - bytesInCurrentSplit += segmentBytes; - } - } - } - } - if (!currentSplit.isEmpty()) { - newSplits.add(new InputSplit<>(currentSplit)); - } - - splits = newSplits; + splits = DruidInputSource.createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(maxInputSegmentBytesPerTask) : splitHintSpec + ); } @Override @@ -483,63 +320,4 @@ public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) initializeSplitsIfNeeded(splitHintSpec); return splits.size(); } - - @VisibleForTesting - static List getUniqueDimensions( - List> timelineSegments, - @Nullable Set excludeDimensions - ) - { - final BiMap uniqueDims = HashBiMap.create(); - - // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be - // optimized for performance. - // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more - // frequently, and thus the performance should be optimized for recent ones rather than old ones. - - // timelineSegments are sorted in order of interval - int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String dimension : chunk.getObject().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); - } - } - } - } - - final BiMap orderedDims = uniqueDims.inverse(); - return IntStream.range(0, orderedDims.size()) - .mapToObj(orderedDims::get) - .collect(Collectors.toList()); - } - - @VisibleForTesting - static List getUniqueMetrics(List> timelineSegments) - { - final BiMap uniqueMetrics = HashBiMap.create(); - - // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent - // segments to olders. - - // timelineSegments are sorted in order of interval - int[] index = {0}; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent(metric, k -> { - return index[0]++; - } - ); - } - } - } - - final BiMap orderedMetrics = uniqueMetrics.inverse(); - return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java new file mode 100644 index 000000000000..4d4d0f30ad16 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; +import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +public class DruidInputSource extends AbstractInputSource implements SplittableInputSource> +{ + private static final Logger LOG = new Logger(DruidInputSource.class); + + private final String dataSource; + // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly + // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel + // batch ingestion. + @Nullable + private final Interval interval; + @Nullable + private final List segmentIds; + private final DimFilter dimFilter; + private final List dimensions; + private final List metrics; + private final IndexIO indexIO; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; + + @JsonCreator + public DruidInputSource( + @JsonProperty("dataSource") final String dataSource, + @JsonProperty("interval") @Nullable Interval interval, + // Specifying "segments" is intended only for when this FirehoseFactory has split itself, + // not for direct end user use. + @JsonProperty("segments") @Nullable List segmentIds, + @JsonProperty("filter") DimFilter dimFilter, + @Nullable @JsonProperty("dimensions") List dimensions, + @Nullable @JsonProperty("metrics") List metrics, + @JacksonInject IndexIO indexIO, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { + throw new IAE("Specify exactly one of 'interval' and 'segments'"); + } + this.dataSource = dataSource; + this.interval = interval; + this.segmentIds = segmentIds; + this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; + this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); + this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); + this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @Nullable + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Nullable + @JsonProperty("segments") + @JsonInclude(Include.NON_NULL) + public List getSegmentIds() + { + return segmentIds; + } + + @JsonProperty("filter") + public DimFilter getDimFilter() + { + return dimFilter; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + @Override + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + { + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + + final List> timeline = createTimeline(); + + final Stream entityStream = createTimeline() + .stream() + .flatMap(holder -> { + final PartitionHolder partitionHolder = holder.getObject(); + return partitionHolder + .stream() + .map(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); + }); + + final List effectiveDimensions; + if (dimensions == null) { + effectiveDimensions = ReingestionTimelineUtils.getUniqueDimensions( + timeline, + inputRowSchema.getDimensionsSpec().getDimensionExclusions() + ); + } else if (inputRowSchema.getDimensionsSpec().hasCustomDimensions()) { + effectiveDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); + } else { + effectiveDimensions = dimensions; + } + + List effectiveMetrics; + if (metrics == null) { + effectiveMetrics = ReingestionTimelineUtils.getUniqueMetrics(timeline); + } else { + effectiveMetrics = metrics; + } + + final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat( + indexIO, + dimFilter, + effectiveDimensions, + effectiveMetrics + ); + + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + entityStream, + temporaryDirectory + ); + } + + private List> createTimeline() + { + if (interval == null) { + return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); + } else { + return getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); + } + } + + @Override + public Stream>> createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. + // If it's not null, segments are already split by the supervisor task and further split won't happen. + if (segmentIds == null) { + return createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + ).stream(); + } else { + return Stream.of(new InputSplit<>(segmentIds)); + } + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. + // If it's not null, segments are already split by the supervisor task and further split won't happen. + if (segmentIds == null) { + return createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + ).size(); + } else { + return 1; + } + } + + @Override + public SplittableInputSource> withSplit(InputSplit> split) + { + return new DruidInputSource( + dataSource, + null, + split.get(), + dimFilter, + dimensions, + metrics, + indexIO, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + } + + @Override + public boolean needsFormat() + { + return false; + } + + public static List>> createSplits( + CoordinatorClient coordinatorClient, + RetryPolicyFactory retryPolicyFactory, + String dataSource, + Interval interval, + SplitHintSpec splitHintSpec + ) + { + final long maxInputSegmentBytesPerTask; + if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { + LOG.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ignoring it.", splitHintSpec); + maxInputSegmentBytesPerTask = new SegmentsSplitHintSpec(null).getMaxInputSegmentBytesPerTask(); + } else { + maxInputSegmentBytesPerTask = ((SegmentsSplitHintSpec) splitHintSpec).getMaxInputSegmentBytesPerTask(); + } + + // isSplittable() ensures this is only called when we have an interval. + final List> timelineSegments = getTimelineForInterval( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval + ); + + // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing + // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their + // data can combine with each other anyway. + + List>> splits = new ArrayList<>(); + List currentSplit = new ArrayList<>(); + Map windowedSegmentIds = new HashMap<>(); + long bytesInCurrentSplit = 0; + for (TimelineObjectHolder timelineHolder : timelineSegments) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + final DataSegment segment = chunk.getObject(); + final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); + if (existingWindowedSegmentId != null) { + // We've already seen this segment in the timeline, so just add this interval to it. It has already + // been placed into a split. + existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); + } else { + // It's the first time we've seen this segment, so create a new WindowedSegmentId. + List intervals = new ArrayList<>(); + // Use the interval that contributes to the timeline, not the entire segment's true interval. + intervals.add(timelineHolder.getInterval()); + final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); + windowedSegmentIds.put(segment, newWindowedSegmentId); + + // Now figure out if it goes in the current split or not. + final long segmentBytes = segment.getSize(); + if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { + // This segment won't fit in the current non-empty split, so this split is done. + splits.add(new InputSplit<>(currentSplit)); + currentSplit = new ArrayList<>(); + bytesInCurrentSplit = 0; + } + if (segmentBytes > maxInputSegmentBytesPerTask) { + // If this segment is itself bigger than our max, just put it in its own split. + Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); + splits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); + } else { + currentSplit.add(newWindowedSegmentId); + bytesInCurrentSplit += segmentBytes; + } + } + } + } + if (!currentSplit.isEmpty()) { + splits.add(new InputSplit<>(currentSplit)); + } + + return splits; + } + + public static List> getTimelineForInterval( + CoordinatorClient coordinatorClient, + RetryPolicyFactory retryPolicyFactory, + String dataSource, + Interval interval + ) + { + Preconditions.checkNotNull(interval); + + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + Collection usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + LOG.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + LOG.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); + } + + public static List> getTimelineForSegmentIds( + CoordinatorClient coordinatorClient, + String dataSource, + List segmentIds + ) + { + final SortedMap> timeline = new TreeMap<>( + Comparators.intervalsByStartThenEnd() + ); + for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) { + final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( + dataSource, + windowedSegmentId.getSegmentId() + ); + for (Interval interval : windowedSegmentId.getIntervals()) { + final TimelineObjectHolder existingHolder = timeline.get(interval); + if (existingHolder != null) { + if (!existingHolder.getVersion().equals(segment.getVersion())) { + throw new ISE("Timeline segments with the same interval should have the same version: " + + "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); + } + existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); + } else { + timeline.put( + interval, + new TimelineObjectHolder<>( + interval, + segment.getInterval(), + segment.getVersion(), + new PartitionHolder<>(segment.getShardSpec().createChunk(segment)) + ) + ); + } + } + } + + // Validate that none of the given windows overlaps (except for when multiple segments share exactly the + // same interval). + Interval lastInterval = null; + for (Interval interval : timeline.keySet()) { + if (lastInterval != null && interval.overlaps(lastInterval)) { + throw new IAE( + "Distinct intervals in input segments may not overlap: [%s] vs [%s]", + lastInterval, + interval + ); + } + lastInterval = interval; + } + + return new ArrayList<>(timeline.values()); + } + + private static long jitter(long input) + { + final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; + long retval = input + (long) jitter; + return retval < 0 ? 0 : retval; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java new file mode 100644 index 000000000000..755086303d24 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.InputStream; +import java.net.URI; + +public class DruidSegmentInputEntity implements InputEntity +{ + private static final EmittingLogger log = new EmittingLogger(DruidSegmentInputEntity.class); + + private final SegmentLoader segmentLoader; + private final DataSegment segment; + private final Interval intervalFilter; + + DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter) + { + this.segmentLoader = segmentLoader; + this.segment = segment; + this.intervalFilter = intervalFilter; + } + + Interval getIntervalFilter() + { + return intervalFilter; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Don't call this"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) + { + final File segmentFile; + try { + segmentFile = segmentLoader.getSegmentFiles(segment); + } + catch (SegmentLoadingException e) { + throw new RuntimeException(e); + } + return new CleanableFile() + { + @Override + public File file() + { + return segmentFile; + } + + @Override + public void close() + { + if (!segmentFile.delete()) { + log.warn("Could not clean temporary segment file: " + segmentFile); + } + } + }; + } + + @Override + public Predicate getRetryCondition() + { + return Predicates.alwaysFalse(); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java new file mode 100644 index 000000000000..80f87721357c --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.IndexIO; + +import java.io.File; +import java.util.List; + +public class DruidSegmentInputFormat implements InputFormat +{ + private final IndexIO indexIO; + private final DimFilter dimFilter; + private List dimensions; + private List metrics; + + DruidSegmentInputFormat( + IndexIO indexIO, + DimFilter dimFilter, + List dimensions, + List metrics + ) + { + this.indexIO = indexIO; + this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) + { + return new DruidSegmentReader( + source, + indexIO, + dimensions, + metrics, + dimFilter, + temporaryDirectory + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java new file mode 100644 index 000000000000..9b5f3a5c563e --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntity.CleanableFile; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; +import org.joda.time.DateTime; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +public class DruidSegmentReader extends IntermediateRowParsingReader> +{ + private final DruidSegmentInputEntity source; + private final IndexIO indexIO; + private final List dimensions; + private final List metrics; + private final DimFilter dimFilter; + private final File temporaryDirectory; + + DruidSegmentReader( + InputEntity source, + IndexIO indexIO, + List dimensions, + List metrics, + DimFilter dimFilter, + File temporaryDirectory + ) + { + Preconditions.checkArgument(source instanceof DruidSegmentInputEntity); + this.source = (DruidSegmentInputEntity) source; + this.indexIO = indexIO; + this.dimensions = dimensions; + this.metrics = metrics; + this.dimFilter = dimFilter; + this.temporaryDirectory = temporaryDirectory; + } + + @Override + protected CloseableIterator> intermediateRowIterator() throws IOException + { + final CleanableFile segmentFile = source.fetch(temporaryDirectory, null); + final WindowedStorageAdapter storageAdapter = new WindowedStorageAdapter( + new QueryableIndexStorageAdapter( + indexIO.loadIndex(segmentFile.file()) + ), + source.getIntervalFilter() + ); + + final Sequence cursors = storageAdapter.getAdapter().makeCursors( + Filters.toFilter(dimFilter), + storageAdapter.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + final Sequence> sequence = Sequences.concat( + Sequences.map( + cursors, + this::cursorToSequence + ) + ); + + return makeCloseableIteratorFromSequenceAndSegmentFile(sequence, segmentFile); + } + + @Override + protected List parseInputRows(Map intermediateRow) throws ParseException + { + final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN); + return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow)); + } + + @Override + protected Map toMap(Map intermediateRow) + { + return intermediateRow; + } + + /** + * Given a {@link Cursor} create a {@link Sequence} that returns the rows of the cursor as + * Map intermediate rows, selecting the dimensions and metrics of this segment reader. + * + * @param cursor A cursor + * @return A sequence of intermediate rows + */ + private Sequence> cursorToSequence( + final Cursor cursor + ) + { + return Sequences.simple( + () -> new IntermediateRowFromCursorIterator(cursor, dimensions, metrics) + ); + } + + /** + * @param sequence A sequence of intermediate rows generated from a sequence of + * cursors in {@link #intermediateRowIterator()} + * @param segmentFile The underlying segment file containing the row data + * @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file + * when the iterator is closed. + */ + private static CloseableIterator> makeCloseableIteratorFromSequenceAndSegmentFile( + final Sequence> sequence, + final CleanableFile segmentFile + ) + { + return new CloseableIterator>() + { + Yielder> rowYielder = Yielders.each(sequence); + + @Override + public boolean hasNext() + { + return !rowYielder.isDone(); + } + + @Override + public Map next() + { + final Map row = rowYielder.get(); + rowYielder = rowYielder.next(null); + return row; + } + + @Override + public void close() throws IOException + { + segmentFile.close(); + } + }; + } + + /** + * Given a {@link Cursor}, a list of dimension names, and a list of metric names, this iterator + * returns the rows of the cursor as Map intermediate rows. + */ + private static class IntermediateRowFromCursorIterator implements Iterator> + { + private final Cursor cursor; + private final BaseLongColumnValueSelector timestampColumnSelector; + private final Map dimSelectors; + private final Map metSelectors; + + public IntermediateRowFromCursorIterator( + Cursor cursor, + List dimensionNames, + List metricNames + ) + { + this.cursor = cursor; + + timestampColumnSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); + + dimSelectors = new HashMap<>(); + for (String dim : dimensionNames) { + final DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + metSelectors = new HashMap<>(); + for (String metric : metricNames) { + final BaseObjectColumnValueSelector metricSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); + metSelectors.put(metric, metricSelector); + } + } + + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public Map next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getLong(); + theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); + + for (Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + int valsSize = vals.size(); + if (valsSize == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else if (valsSize > 1) { + List dimVals = new ArrayList<>(valsSize); + for (int i = 0; i < valsSize; ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final BaseObjectColumnValueSelector selector = metSelector.getValue(); + Object value = selector.getObject(); + if (value != null) { + theEvent.put(metric, value); + } + } + cursor.advance(); + return theEvent; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index c9ac50c8d8cf..130f041f7853 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -187,7 +187,7 @@ private InputSourceReader buildReader( SamplerConfig samplerConfig, DataSchema dataSchema, InputSource inputSource, - InputFormat inputFormat, + @Nullable InputFormat inputFormat, File tempDir ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 6217dc1fa187..98bb64bdb511 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -25,6 +25,8 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.LockGranularity; @@ -38,6 +40,8 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -65,9 +69,12 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; @RunWith(Parameterized.class) public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest @@ -166,6 +173,38 @@ public void testRunParallel() throws Exception runTask(compactionTask); } + @Test + public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Exception + { + runIndexTask(); + + Interval interval = Intervals.of("2014-01-01/2014-01-02"); + + List>> splits = DruidInputSource.createSplits( + coordinatorClient, + RETRY_POLICY_FACTORY, + DATA_SOURCE, + interval, + new SegmentsSplitHintSpec(1L) // each segment gets its own split with this config + ); + + List segments = new ArrayList<>( + coordinatorClient.getDatabaseSegmentDataSourceSegments( + DATA_SOURCE, + ImmutableList.of(interval) + ) + ); + + Set segmentIdsFromSplits = new HashSet<>(); + Set segmentIdsFromCoordinator = new HashSet<>(); + Assert.assertEquals(segments.size(), splits.size()); + for (int i = 0; i < segments.size(); i++) { + segmentIdsFromCoordinator.add(segments.get(i).getId().toString()); + segmentIdsFromSplits.add(splits.get(i).get().get(0).getSegmentId()); + } + Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits); + } + private void runIndexTask() throws Exception { File tmpDir = temporaryFolder.newFolder(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index b1a15d37cdb8..3a18cac2b589 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -42,12 +43,23 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask.Builder; +import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -60,6 +72,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -88,6 +101,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -105,7 +119,7 @@ public class CompactionTaskRunTest extends IngestionTestBase public static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec("ts", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))), - null, + "|", Arrays.asList("ts", "dim", "val"), false, 0 @@ -124,6 +138,19 @@ public class CompactionTaskRunTest extends IngestionTestBase ) ); + private static final List TEST_ROWS = ImmutableList.of( + "2014-01-01T00:00:10Z,a,1\n", + "2014-01-01T00:00:10Z,b,2\n", + "2014-01-01T00:00:10Z,c,3\n", + "2014-01-01T01:00:20Z,a,1\n", + "2014-01-01T01:00:20Z,b,2\n", + "2014-01-01T01:00:20Z,c,3\n", + "2014-01-01T02:00:30Z,a,1\n", + "2014-01-01T02:00:30Z,b,2\n", + "2014-01-01T02:00:30Z,c,3\n", + "2014-01-01T02:00:30Z,c|d|e,3\n" + ); + @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() { @@ -141,13 +168,14 @@ public static Iterable constructorFeeder() private final SegmentLoaderFactory segmentLoaderFactory; private final LockGranularity lockGranularity; private final AppenderatorsManager appenderatorsManager; + private final TestUtils testUtils; private ExecutorService exec; private File localDeepStorage; public CompactionTaskRunTest(LockGranularity lockGranularity) { - TestUtils testUtils = new TestUtils(); + testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); indexingServiceClient = new NoopIndexingServiceClient(); coordinatorClient = new CoordinatorClient(null, null) @@ -221,6 +249,9 @@ public void testRun() throws Exception Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); } } + + List rowsFromSegment = getCSVFormatRowsFromSegments(segments); + Assert.assertEquals(TEST_ROWS, rowsFromSegment); } @Test @@ -638,6 +669,93 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime2() throws Exceptio Assert.assertEquals(TaskState.FAILED, compactionResult.lhs.getStatusCode()); } + /** + * Run a regular index task that's equivalent to the compaction task in {@link #testRun()}, using + * {@link IngestSegmentFirehoseFactory}. + * + * This is not entirely CompactionTask related, but it's similar conceptually and it requires + * similar setup to what this test suite already has. + * + * It could be moved to a separate test class if needed. + */ + @Test + public void testRunRegularIndexTaskWithIngestSegmentFirehose() throws Exception + { + runIndexTask(); + + IndexTask indexTask = new IndexTask( + null, + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + getObjectMapper().convertValue( + new StringInputRowParser( + DEFAULT_PARSE_SPEC, + null + ), + Map.class + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + null, + getObjectMapper() + ), + new IndexTask.IndexIOConfig( + new IngestSegmentFirehoseFactory( + DATA_SOURCE, + Intervals.of("2014-01-01/2014-01-02"), + null, + null, + null, + null, + null, + getIndexIO(), + coordinatorClient, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ), + false + ), + IndexTaskTest.createTuningConfig(5000000, null, null, Long.MAX_VALUE, null, null, false, true) + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + appenderatorsManager + ); + + final Pair> resultPair = runTask(indexTask); + + Assert.assertTrue(resultPair.lhs.isSuccess()); + + final List segments = resultPair.rhs; + Assert.assertEquals(3, segments.size()); + + for (int i = 0; i < 3; i++) { + Assert.assertEquals( + Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), + segments.get(i).getInterval() + ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + if (lockGranularity == LockGranularity.SEGMENT) { + Assert.assertEquals( + new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), + segments.get(i).getShardSpec() + ); + } else { + Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + } + } + } + private Pair> runIndexTask() throws Exception { return runIndexTask(null, null, false); @@ -658,15 +776,9 @@ private Pair> runIndexTask( File tmpFile = File.createTempFile("druid", "index", tmpDir); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("2014-01-01T00:00:10Z,a,1\n"); - writer.write("2014-01-01T00:00:10Z,b,2\n"); - writer.write("2014-01-01T00:00:10Z,c,3\n"); - writer.write("2014-01-01T01:00:20Z,a,1\n"); - writer.write("2014-01-01T01:00:20Z,b,2\n"); - writer.write("2014-01-01T01:00:20Z,c,3\n"); - writer.write("2014-01-01T02:00:30Z,a,1\n"); - writer.write("2014-01-01T02:00:30Z,b,2\n"); - writer.write("2014-01-01T02:00:30Z,c,3\n"); + for (String testRow : TEST_ROWS) { + writer.write(testRow); + } } IndexTask indexTask = new IndexTask( @@ -782,4 +894,71 @@ public List getLocations() null ); } + + private List getCSVFormatRowsFromSegments(List segments) throws Exception + { + + final File cacheDir = temporaryFolder.newFolder(); + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(cacheDir); + + List cursors = new ArrayList<>(); + for (DataSegment segment : segments) { + final File segmentFile = segmentLoader.getSegmentFiles(segment); + + final WindowedStorageAdapter adapter = new WindowedStorageAdapter( + new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)), + segment.getInterval() + ); + final Sequence cursorSequence = adapter.getAdapter().makeCursors( + null, + segment.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + cursors.addAll(cursorSequence.toList()); + } + + List rowsFromSegment = new ArrayList<>(); + for (Cursor cursor : cursors) { + cursor.reset(); + while (!cursor.isDone()) { + final DimensionSelector selector1 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("ts", "ts")); + final DimensionSelector selector2 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("dim", "dim")); + final DimensionSelector selector3 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("val", "val")); + + Object dimObject = selector2.getObject(); + String dimVal = null; + if (dimObject instanceof String) { + dimVal = (String) dimObject; + } else if (dimObject instanceof List) { + dimVal = String.join("|", (List) dimObject); + } + + rowsFromSegment.add( + makeCSVFormatRow( + selector1.getObject().toString(), + dimVal, + selector3.defaultGetObject().toString() + ) + ); + + cursor.advance(); + } + } + return rowsFromSegment; + } + + private static String makeCSVFormatRow( + String ts, + String dim, + String val + ) + { + return StringUtils.format("%s,%s,%s\n", ts, dim, val); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c76bc9c0fbf4..bb34724ade3b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -32,7 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; -import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; @@ -59,7 +59,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -1084,16 +1084,16 @@ private void assertIngestionSchema( // assert ioConfig final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); Assert.assertFalse(ioConfig.isAppendToExisting()); - final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); - Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); - final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory; - Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource()); - Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval()); - Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter()); + final InputSource inputSource = ioConfig.getInputSource(); + Assert.assertTrue(inputSource instanceof DruidInputSource); + final DruidInputSource druidInputSource = (DruidInputSource) inputSource; + Assert.assertEquals(DATA_SOURCE, druidInputSource.getDataSource()); + Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval()); + Assert.assertNull(druidInputSource.getDimFilter()); Assert.assertEquals( new HashSet<>(expectedDimensionsSpec.getDimensionNames()), - new HashSet<>(ingestSegmentFirehoseFactory.getDimensions()) + new HashSet<>(druidInputSource.getDimensions()) ); // assert tuningConfig diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java index c700ad361321..af67f9908f51 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java @@ -45,7 +45,7 @@ public void testParserAndInputFormat() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage( - "At most one of [Property{name='parser', value={fake=parser map}}, Property{name='inputFormat'," + "Cannot use parser and inputSource together. Try using inputFormat instead of parser." ); final IndexIngestionSpec spec = new IndexIngestionSpec( new DataSchema( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index a50e2a96988b..d4e40781016e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -41,6 +41,7 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -63,6 +64,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -308,6 +310,7 @@ public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMap public void configure(Binder binder) { binder.bind(LocalDataSegmentPuller.class); + binder.bind(ExprMacroTable.class).toInstance(TestExprMacroTable.INSTANCE); } } ) @@ -598,11 +601,11 @@ public void testGetUniqueDimensionsAndMetrics() }; Assert.assertEquals( Arrays.asList(expectedDims), - IngestSegmentFirehoseFactory.getUniqueDimensions(timelineSegments, null) + ReingestionTimelineUtils.getUniqueDimensions(timelineSegments, null) ); Assert.assertEquals( Arrays.asList(expectedMetrics), - IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments) + ReingestionTimelineUtils.getUniqueMetrics(timelineSegments) ); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java index 0e95e9ac824c..e082ffc9f66f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java @@ -24,6 +24,7 @@ import com.google.inject.Module; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.initialization.Initialization; import org.testng.IModuleFactory; import org.testng.ITestContext; @@ -48,7 +49,8 @@ private static List getModules() { return ImmutableList.of( new DruidTestModule(), - new IndexingServiceFirehoseModule() + new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule() ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index be99de14933f..eaab3687ae10 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -44,6 +44,8 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest private static final String INDEX_DATASOURCE = "wikipedia_parallel_index_test"; private static final String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test"; private static final String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json"; + private static final String INDEX_DRUID_INPUT_SOURCE_DATASOURCE = "wikipedia_parallel_druid_input_source_index_test"; + private static final String INDEX_DRUID_INPUT_SOURCE_TASK = "/indexer/wikipedia_parallel_druid_input_source_index_task.json"; @DataProvider public static Object[][] resources() @@ -58,7 +60,8 @@ public static Object[][] resources() public void testIndexData(PartitionsSpec partitionsSpec) throws Exception { try (final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); - final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()) + final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored3 = unloader(INDEX_DRUID_INPUT_SOURCE_DATASOURCE + config.getExtraDatasourceNameSuffix()) ) { boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible(); @@ -107,6 +110,15 @@ public void testIndexData(PartitionsSpec partitionsSpec) throws Exception INDEX_INGEST_SEGMENT_TASK, REINDEX_QUERIES_RESOURCE ); + + // with DruidInputSource instead of IngestSegmentFirehose + doReindexTest( + INDEX_DATASOURCE, + INDEX_DRUID_INPUT_SOURCE_DATASOURCE, + rollupTransform, + INDEX_DRUID_INPUT_SOURCE_TASK, + REINDEX_QUERIES_RESOURCE + ); } else { doReindexTest( INDEX_DATASOURCE, @@ -115,6 +127,15 @@ public void testIndexData(PartitionsSpec partitionsSpec) throws Exception INDEX_INGEST_SEGMENT_TASK, INDEX_QUERIES_RESOURCE ); + + // with DruidInputSource instead of IngestSegmentFirehose + doReindexTest( + INDEX_DATASOURCE, + INDEX_DRUID_INPUT_SOURCE_DATASOURCE, + rollupTransform, + INDEX_DRUID_INPUT_SOURCE_TASK, + INDEX_QUERIES_RESOURCE + ); } } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json new file mode 100644 index 000000000000..91702a413574 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json @@ -0,0 +1,63 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "dimensionsSpec": { + "dimensionExclusions": [ + "robot", + "continent" + ] + }, + "timestampSpec": { + "column": "timestamp" + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals": [ + "2013-08-31/2013-09-02" + ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "druid", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-02" + } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 10, + "forceGuaranteedRollup": "%%FORCE_GUARANTEED_ROLLUP%%", + "partitionsSpec": %%PARTITIONS_SPEC%%, + "splitHintSpec": { + "type":"segments", + "maxInputSegmentBytesPerTask": 1 + } + } + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 0b3b283eec42..01d53f18b2c4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -34,6 +34,7 @@ import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.Jerseys; @@ -201,6 +202,7 @@ public DataNodeService getDataNodeService() } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 41e3ff2f716d..88646ddee6db 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -34,6 +34,7 @@ import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.Jerseys; @@ -174,6 +175,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new LookupSerdeModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 0cd3730de43a..23e985e01dac 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -40,6 +40,7 @@ import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.JacksonConfigProvider; @@ -337,6 +338,7 @@ private void configureOverlordHelpers(Binder binder) } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), new SupervisorModule(), new LookupSerdeModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index dfe5f1049304..a075e31201d1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -44,6 +44,7 @@ import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -258,6 +259,7 @@ public SegmentListerResource getSegmentListerResource( }, new QueryablePeonModule(), new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), new ChatHandlerServerModule(properties), new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java index 51eb8df06ef3..1d0eb1758957 100644 --- a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java +++ b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java @@ -41,6 +41,7 @@ import org.apache.druid.guice.ExtensionsConfig; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; @@ -129,6 +130,7 @@ public void run() new FirehoseModule(), new IndexingHadoopModule(), new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), new LocalDataStorageDruidModule() ) ) diff --git a/website/.spelling b/website/.spelling index 0be8bb4cf144..b52745261a53 100644 --- a/website/.spelling +++ b/website/.spelling @@ -940,6 +940,8 @@ forceGuaranteedRollup httpAuthenticationPassword httpAuthenticationUsername ingestSegment +InputSource +DruidInputSource maxInputSegmentBytesPerTask maxNumConcurrentSubTasks maxNumSegmentsToMerge