From 65618b0600bf4fa2e283dd4dd5ab865fc402b06a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 Nov 2019 19:03:14 -0800 Subject: [PATCH 01/13] Add Druid input source and format --- .../IndexingServiceInputFormatModule.java | 49 ++ .../IndexingServiceInputSourceModule.java | 49 ++ .../indexing/common/task/CompactionTask.java | 24 +- .../druid/indexing/common/task/IndexTask.java | 18 +- .../common/task/InputSourceProcessor.java | 2 +- .../parallel/ParallelIndexIngestionSpec.java | 14 +- .../parallel/PartialSegmentGenerateTask.java | 2 +- .../batch/parallel/SinglePhaseSubTask.java | 2 +- .../IngestSegmentFirehoseFactory.java | 180 +------- .../indexing/input/DruidInputSource.java | 432 ++++++++++++++++++ .../input/DruidSegmentInputEntity.java | 99 ++++ .../input/DruidSegmentInputFormat.java | 64 +++ .../indexing/input/DruidSegmentReader.java | 225 +++++++++ .../overlord/sampler/InputSourceSampler.java | 2 +- .../common/task/CompactionTaskTest.java | 18 +- .../testing/guice/DruidTestModuleFactory.java | 6 +- .../java/org/apache/druid/cli/CliIndexer.java | 4 + .../apache/druid/cli/CliMiddleManager.java | 4 + .../org/apache/druid/cli/CliOverlord.java | 4 + .../java/org/apache/druid/cli/CliPeon.java | 4 + .../cli/validate/DruidJsonValidator.java | 4 + 21 files changed, 994 insertions(+), 212 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java create mode 100644 indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java new file mode 100644 index 000000000000..81b2287fe1d9 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.indexing.input.DruidSegmentInputFormat; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class IndexingServiceInputFormatModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("IndexingServiceInputFormatModule") + .registerSubtypes( + new NamedType(DruidSegmentInputFormat.class, "druidSegment") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java new file mode 100644 index 000000000000..56159e1c1000 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputSourceModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class IndexingServiceInputSourceModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("IndexingServiceInputSourceModule") + .registerSubtypes( + new NamedType(DruidInputSource.class, "druid") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 16bedc3cac1e..e85e57cd5354 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -37,11 +37,9 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema; -import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; @@ -57,7 +55,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -68,7 +66,6 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.DimensionHandler; @@ -529,20 +526,20 @@ private static ParallelIndexIOConfig createIoConfig( ) { return new ParallelIndexIOConfig( - new IngestSegmentFirehoseFactory( + null, + new DruidInputSource( dataSchema.getDataSource(), interval, null, - null, // no filter - // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose - dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), - Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), null, + dataSchema.getDimensionsSpec().getDimensionNames(), + Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), toolbox.getIndexIO(), coordinatorClient, segmentLoaderFactory, retryPolicyFactory ), + null, false ); } @@ -603,15 +600,14 @@ private static DataSchema createDataSchema( final AggregatorFactory[] finalMetricsSpec = metricsSpec == null ? createMetricsSpec(queryableIndexAndSegments) : convertToCombiningFactories(metricsSpec); - final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec)); return new DataSchema( dataSource, - jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), + new TimestampSpec(null, null, null), + finalDimensionsSpec, finalMetricsSpec, granularitySpec, - null, - jsonMapper + null ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 094a713cde16..741b463d8581 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -729,7 +729,7 @@ private Map> collectIntervalsAndShardSp ingestionSchema.getDataSchema().getDimensionsSpec(), metricsNames ), - getInputFormat(ingestionSchema), + inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null, tmpDir ) ); @@ -919,7 +919,7 @@ private TaskStatus generateAndPublishSegments( driver, partitionsSpec, inputSource, - getInputFormat(ingestionSchema), + inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null, tmpDir, segmentAllocator ); @@ -1037,15 +1037,17 @@ public IndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); } + if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("parser", dataSchema.getParserMap()), + new Property<>("inputFormat", ioConfig.getInputFormat()) + ) + ); + } this.dataSchema = dataSchema; this.ioConfig = ioConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java index 088503e0536a..f20cf50bed3b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java @@ -94,7 +94,7 @@ public SegmentsAndMetadata process( BatchAppenderatorDriver driver, PartitionsSpec partitionsSpec, InputSource inputSource, - InputFormat inputFormat, + @Nullable InputFormat inputFormat, File tmpDir, IndexTaskSegmentAllocator segmentAllocator ) throws IOException, InterruptedException, ExecutionException, TimeoutException diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java index 9c448992eccf..06baae4b7dd3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java @@ -44,17 +44,19 @@ public ParallelIndexIngestionSpec( { super(dataSchema, ioConfig, tuningConfig); - Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("parser", dataSchema.getParserMap()), - new Property<>("inputFormat", ioConfig.getInputFormat()) - ) - ); if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) { if (!(ioConfig.getInputSource() instanceof FirehoseFactoryToInputSourceAdaptor)) { throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser."); } } + if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) { + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("parser", dataSchema.getParserMap()), + new Property<>("inputFormat", ioConfig.getInputFormat()) + ) + ); + } this.dataSchema = dataSchema; this.ioConfig = ioConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 4dabf7ea02c5..642936109660 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -193,7 +193,7 @@ private List generateSegments( driver, partitionsSpec, inputSource, - ParallelIndexSupervisorTask.getInputFormat(ingestionSchema), + inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null, tmpDir, segmentAllocator ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 8ea6fc916118..8bf40136eafe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -431,7 +431,7 @@ private Set generateAndPushSegments( ingestionSchema.getDataSchema().getDimensionsSpec(), metricsNames ), - ParallelIndexSupervisorTask.getInputFormat(ingestionSchema), + inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null, tmpDir ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index dd4911a33704..6549fe53452a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -37,12 +37,10 @@ import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; @@ -55,25 +53,15 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -290,176 +278,28 @@ public WindowedStorageAdapter apply(final PartitionChunk input) return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } - private long jitter(long input) - { - final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; - long retval = input + (long) jitter; - return retval < 0 ? 0 : retval; - } - private List> getTimeline() { if (interval == null) { - return getTimelineForSegmentIds(); + return DruidInputSource.getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(); + return DruidInputSource.getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); } } - private List> getTimelineForInterval() - { - Preconditions.checkNotNull(interval); - - // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration - // as TaskActionClient. - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); - Collection usedSegments; - while (true) { - try { - usedSegments = - coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); - break; - } - catch (Throwable e) { - log.warn(e, "Exception getting database segments"); - final Duration delay = retryPolicy.getAndIncrementRetryDelay(); - if (delay == null) { - throw e; - } else { - final long sleepTime = jitter(delay.getMillis()); - log.info("Will try again in [%s].", new Duration(sleepTime).toString()); - try { - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw new RuntimeException(e2); - } - } - } - } - - return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); - } - - private List> getTimelineForSegmentIds() - { - final SortedMap> timeline = new TreeMap<>( - Comparators.intervalsByStartThenEnd() - ); - for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) { - final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( - dataSource, - windowedSegmentId.getSegmentId() - ); - for (Interval interval : windowedSegmentId.getIntervals()) { - final TimelineObjectHolder existingHolder = timeline.get(interval); - if (existingHolder != null) { - if (!existingHolder.getVersion().equals(segment.getVersion())) { - throw new ISE("Timeline segments with the same interval should have the same version: " + - "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); - } - existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); - } else { - timeline.put(interval, new TimelineObjectHolder<>( - interval, - segment.getInterval(), - segment.getVersion(), - new PartitionHolder(segment.getShardSpec().createChunk(segment)) - )); - } - } - } - - // Validate that none of the given windows overlaps (except for when multiple segments share exactly the - // same interval). - Interval lastInterval = null; - for (Interval interval : timeline.keySet()) { - if (lastInterval != null) { - if (interval.overlaps(lastInterval)) { - throw new IAE( - "Distinct intervals in input segments may not overlap: [%s] vs [%s]", - lastInterval, - interval - ); - } - } - lastInterval = interval; - } - - return new ArrayList<>(timeline.values()); - } - private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) { if (splits != null) { return; } - final SegmentsSplitHintSpec nonNullSplitHintSpec; - if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { - if (splitHintSpec != null) { - log.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); - } - nonNullSplitHintSpec = new SegmentsSplitHintSpec(null); - } else { - nonNullSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec; - } - - final long maxInputSegmentBytesPerTask = this.maxInputSegmentBytesPerTask == null - ? nonNullSplitHintSpec.getMaxInputSegmentBytesPerTask() - : this.maxInputSegmentBytesPerTask; - - // isSplittable() ensures this is only called when we have an interval. - final List> timelineSegments = getTimelineForInterval(); - - // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing - // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their - // data can combine with each other anyway. - - List>> newSplits = new ArrayList<>(); - List currentSplit = new ArrayList<>(); - Map windowedSegmentIds = new HashMap<>(); - long bytesInCurrentSplit = 0; - for (TimelineObjectHolder timelineHolder : timelineSegments) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - final DataSegment segment = chunk.getObject(); - final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); - if (existingWindowedSegmentId != null) { - // We've already seen this segment in the timeline, so just add this interval to it. It has already - // been placed into a split. - existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); - } else { - // It's the first time we've seen this segment, so create a new WindowedSegmentId. - List intervals = new ArrayList<>(); - // Use the interval that contributes to the timeline, not the entire segment's true interval. - intervals.add(timelineHolder.getInterval()); - final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); - windowedSegmentIds.put(segment, newWindowedSegmentId); - - // Now figure out if it goes in the current split or not. - final long segmentBytes = segment.getSize(); - if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { - // This segment won't fit in the current non-empty split, so this split is done. - newSplits.add(new InputSplit<>(currentSplit)); - currentSplit = new ArrayList<>(); - bytesInCurrentSplit = 0; - } - if (segmentBytes > maxInputSegmentBytesPerTask) { - // If this segment is itself bigger than our max, just put it in its own split. - Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); - newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); - } else { - currentSplit.add(newWindowedSegmentId); - bytesInCurrentSplit += segmentBytes; - } - } - } - } - if (!currentSplit.isEmpty()) { - newSplits.add(new InputSplit<>(currentSplit)); - } - - splits = newSplits; + splits = DruidInputSource.createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + ); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java new file mode 100644 index 000000000000..ea4d5c97937b --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +public class DruidInputSource extends AbstractInputSource implements SplittableInputSource> +{ + private static final Logger LOG = new Logger(DruidInputSource.class); + + private final String dataSource; + // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly + // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel + // batch ingestion. + @Nullable + private final Interval interval; + @Nullable + private final List segmentIds; + private final DimFilter dimFilter; + private final List dimensions; + private final List metrics; + private final IndexIO indexIO; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; + private final DruidSegmentInputFormat inputFormat; + + @JsonCreator + public DruidInputSource( + @JsonProperty("dataSource") final String dataSource, + @JsonProperty("interval") @Nullable Interval interval, + // Specifying "segments" is intended only for when this FirehoseFactory has split itself, + // not for direct end user use. + @JsonProperty("segments") @Nullable List segmentIds, + @JsonProperty("filter") DimFilter dimFilter, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics, + @JacksonInject IndexIO indexIO, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { + throw new IAE("Specify exactly one of 'interval' and 'segments'"); + } + this.dataSource = dataSource; + this.interval = interval; + this.segmentIds = segmentIds; + this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; + this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); + this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); + this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); + this.inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter); + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @Nullable + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Nullable + @JsonProperty("segments") + @JsonInclude(Include.NON_NULL) + public List getSegmentIds() + { + return segmentIds; + } + + @JsonProperty("filter") + public DimFilter getDimFilter() + { + return dimFilter; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + @Override + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + { + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + + final Stream entityStream = createTimeline() + .stream() + .flatMap(holder -> { + final PartitionHolder partitionHolder = holder.getObject(); + return partitionHolder + .stream() + .map(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); + }); + + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + entityStream, + temporaryDirectory + ); + } + + private List> createTimeline() + { + if (interval == null) { + return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); + } else { + return getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); + } + } + + @Override + public Stream>> createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. + // If it's not null, segments are already split by the supervisor task and further split won't happen. + if (segmentIds == null) { + return createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + ).stream(); + } else { + return Stream.of(new InputSplit<>(segmentIds)); + } + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. + // If it's not null, segments are already split by the supervisor task and further split won't happen. + if (segmentIds == null) { + return createSplits( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval, + splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + ).size(); + } else { + return 1; + } + } + + @Override + public SplittableInputSource> withSplit(InputSplit> split) + { + return new DruidInputSource( + dataSource, + null, + split.get(), + dimFilter, + dimensions, + metrics, + indexIO, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + } + + @Override + public boolean needsFormat() + { + return false; + } + + public static List>> createSplits( + CoordinatorClient coordinatorClient, + RetryPolicyFactory retryPolicyFactory, + String dataSource, + Interval interval, + SplitHintSpec splitHintSpec + ) + { + final long maxInputSegmentBytesPerTask; + if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { + LOG.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); + maxInputSegmentBytesPerTask = new SegmentsSplitHintSpec(null).getMaxInputSegmentBytesPerTask(); + } else { + maxInputSegmentBytesPerTask = ((SegmentsSplitHintSpec) splitHintSpec).getMaxInputSegmentBytesPerTask(); + } + + // isSplittable() ensures this is only called when we have an interval. + final List> timelineSegments = getTimelineForInterval( + coordinatorClient, + retryPolicyFactory, + dataSource, + interval + ); + + // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing + // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their + // data can combine with each other anyway. + + List>> splits = new ArrayList<>(); + List currentSplit = new ArrayList<>(); + Map windowedSegmentIds = new HashMap<>(); + long bytesInCurrentSplit = 0; + for (TimelineObjectHolder timelineHolder : timelineSegments) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + final DataSegment segment = chunk.getObject(); + final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); + if (existingWindowedSegmentId != null) { + // We've already seen this segment in the timeline, so just add this interval to it. It has already + // been placed into a split. + existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); + } else { + // It's the first time we've seen this segment, so create a new WindowedSegmentId. + List intervals = new ArrayList<>(); + // Use the interval that contributes to the timeline, not the entire segment's true interval. + intervals.add(timelineHolder.getInterval()); + final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); + windowedSegmentIds.put(segment, newWindowedSegmentId); + + // Now figure out if it goes in the current split or not. + final long segmentBytes = segment.getSize(); + if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { + // This segment won't fit in the current non-empty split, so this split is done. + splits.add(new InputSplit<>(currentSplit)); + currentSplit = new ArrayList<>(); + bytesInCurrentSplit = 0; + } + if (segmentBytes > maxInputSegmentBytesPerTask) { + // If this segment is itself bigger than our max, just put it in its own split. + Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); + splits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); + } else { + currentSplit.add(newWindowedSegmentId); + bytesInCurrentSplit += segmentBytes; + } + } + } + } + if (!currentSplit.isEmpty()) { + splits.add(new InputSplit<>(currentSplit)); + } + + return splits; + } + + public static List> getTimelineForInterval( + CoordinatorClient coordinatorClient, + RetryPolicyFactory retryPolicyFactory, + String dataSource, + Interval interval + ) + { + Preconditions.checkNotNull(interval); + + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + Collection usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + LOG.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + LOG.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); + } + + public static List> getTimelineForSegmentIds( + CoordinatorClient coordinatorClient, + String dataSource, + List segmentIds + ) + { + final SortedMap> timeline = new TreeMap<>( + Comparators.intervalsByStartThenEnd() + ); + for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) { + final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( + dataSource, + windowedSegmentId.getSegmentId() + ); + for (Interval interval : windowedSegmentId.getIntervals()) { + final TimelineObjectHolder existingHolder = timeline.get(interval); + if (existingHolder != null) { + if (!existingHolder.getVersion().equals(segment.getVersion())) { + throw new ISE("Timeline segments with the same interval should have the same version: " + + "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); + } + existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); + } else { + timeline.put( + interval, + new TimelineObjectHolder<>( + interval, + segment.getInterval(), + segment.getVersion(), + new PartitionHolder<>(segment.getShardSpec().createChunk(segment)) + ) + ); + } + } + } + + // Validate that none of the given windows overlaps (except for when multiple segments share exactly the + // same interval). + Interval lastInterval = null; + for (Interval interval : timeline.keySet()) { + if (lastInterval != null) { + if (interval.overlaps(lastInterval)) { + throw new IAE( + "Distinct intervals in input segments may not overlap: [%s] vs [%s]", + lastInterval, + interval + ); + } + } + lastInterval = interval; + } + + return new ArrayList<>(timeline.values()); + } + + private static long jitter(long input) + { + final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; + long retval = input + (long) jitter; + return retval < 0 ? 0 : retval; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java new file mode 100644 index 000000000000..a7f39088f314 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.InputStream; +import java.net.URI; + +public class DruidSegmentInputEntity implements InputEntity +{ + private final SegmentLoader segmentLoader; + private final DataSegment segment; + private final Interval intervalFilter; + + DruidSegmentInputEntity(SegmentLoader segmentLoader, DataSegment segment, Interval intervalFilter) + { + this.segmentLoader = segmentLoader; + this.segment = segment; + this.intervalFilter = intervalFilter; + } + + Interval getIntervalFilter() + { + return intervalFilter; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Don't call this"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) + { + final File segmentFile; + try { + segmentFile = segmentLoader.getSegmentFiles(segment); + } + catch (SegmentLoadingException e) { + throw new RuntimeException(e); + } + return new CleanableFile() + { + @Override + public File file() + { + return segmentFile; + } + + @Override + public void close() + { + if (!segmentFile.delete()) { + // log + } + } + }; + } + + @Override + public Predicate getRetryCondition() + { + return Predicates.alwaysFalse(); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java new file mode 100644 index 000000000000..49e826ecc289 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.IndexIO; + +import java.io.File; + +public class DruidSegmentInputFormat implements InputFormat +{ + private final IndexIO indexIO; + private final DimFilter dimFilter; + + DruidSegmentInputFormat(IndexIO indexIO, DimFilter dimFilter) + { + this.indexIO = indexIO; + this.dimFilter = dimFilter; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) + { + return new DruidSegmentReader( + source, + indexIO, + inputRowSchema.getDimensionsSpec().getDimensionNames(), + inputRowSchema.getMetricNames(), + dimFilter, + temporaryDirectory + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java new file mode 100644 index 000000000000..3147214d5b7c --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntity.CleanableFile; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; +import org.joda.time.DateTime; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +public class DruidSegmentReader extends IntermediateRowParsingReader> +{ + private final DruidSegmentInputEntity source; + private final IndexIO indexIO; + private final List dimensions; + private final List metrics; + private final DimFilter dimFilter; + private final File temporaryDirectory; + + DruidSegmentReader( + InputEntity source, + IndexIO indexIO, + List dimensions, + List metrics, + DimFilter dimFilter, + File temporaryDirectory + ) + { + Preconditions.checkArgument(source instanceof DruidSegmentInputEntity); + this.source = (DruidSegmentInputEntity) source; + this.indexIO = indexIO; + this.dimensions = dimensions; + this.metrics = metrics; + this.dimFilter = dimFilter; + this.temporaryDirectory = temporaryDirectory; + } + + @Override + protected CloseableIterator> intermediateRowIterator() throws IOException + { + final CleanableFile segmentFile = source.fetch(temporaryDirectory, null); + final WindowedStorageAdapter storageAdapter = new WindowedStorageAdapter( + new QueryableIndexStorageAdapter( + indexIO.loadIndex(segmentFile.file()) + ), + source.getIntervalFilter() + ); + final Sequence> sequence = Sequences.concat( + Sequences.map( + storageAdapter.getAdapter().makeCursors( + Filters.toFilter(dimFilter), + storageAdapter.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + cursor -> { + final BaseLongColumnValueSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); + + final Map dimSelectors = new HashMap<>(); + for (String dim : dimensions) { + final DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + final Map metSelectors = new HashMap<>(); + for (String metric : metrics) { + final BaseObjectColumnValueSelector metricSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); + metSelectors.put(metric, metricSelector); + } + + return Sequences.simple( + () -> new Iterator>() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public Map next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getLong(); + theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); + + for (Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + int valsSize = vals.size(); + if (valsSize == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else if (valsSize > 1) { + List dimVals = new ArrayList<>(valsSize); + for (int i = 0; i < valsSize; ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final BaseObjectColumnValueSelector selector = metSelector.getValue(); + Object value = selector.getObject(); + if (value != null) { + theEvent.put(metric, value); + } + } + cursor.advance(); + return theEvent; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + } + ); + } + ) + ); + return new CloseableIterator>() + { + Yielder> rowYielder = Yielders.each(sequence); + + @Override + public boolean hasNext() + { + return !rowYielder.isDone(); + } + + @Override + public Map next() + { + final Map row = rowYielder.get(); + rowYielder = rowYielder.next(null); + return row; + } + + @Override + public void close() throws IOException + { + segmentFile.close(); + } + }; + } + + @Override + protected List parseInputRows(Map intermediateRow) throws ParseException + { + final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN); + return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow)); + } + + @Override + protected Map toMap(Map intermediateRow) + { + return intermediateRow; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index c9ac50c8d8cf..130f041f7853 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -187,7 +187,7 @@ private InputSourceReader buildReader( SamplerConfig samplerConfig, DataSchema dataSchema, InputSource inputSource, - InputFormat inputFormat, + @Nullable InputFormat inputFormat, File tempDir ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c76bc9c0fbf4..bb34724ade3b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -32,7 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; -import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; @@ -59,7 +59,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -1084,16 +1084,16 @@ private void assertIngestionSchema( // assert ioConfig final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); Assert.assertFalse(ioConfig.isAppendToExisting()); - final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); - Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); - final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory; - Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource()); - Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval()); - Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter()); + final InputSource inputSource = ioConfig.getInputSource(); + Assert.assertTrue(inputSource instanceof DruidInputSource); + final DruidInputSource druidInputSource = (DruidInputSource) inputSource; + Assert.assertEquals(DATA_SOURCE, druidInputSource.getDataSource()); + Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval()); + Assert.assertNull(druidInputSource.getDimFilter()); Assert.assertEquals( new HashSet<>(expectedDimensionsSpec.getDimensionNames()), - new HashSet<>(ingestSegmentFirehoseFactory.getDimensions()) + new HashSet<>(druidInputSource.getDimensions()) ); // assert tuningConfig diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java index 0e95e9ac824c..24d64165af3b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java @@ -24,6 +24,8 @@ import com.google.inject.Module; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.initialization.Initialization; import org.testng.IModuleFactory; import org.testng.ITestContext; @@ -48,7 +50,9 @@ private static List getModules() { return ImmutableList.of( new DruidTestModule(), - new IndexingServiceFirehoseModule() + new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 0b3b283eec42..1a29ab82d418 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -34,6 +34,8 @@ import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.Jerseys; @@ -201,6 +203,8 @@ public DataNodeService getDataNodeService() } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 41e3ff2f716d..0aecdcf90d20 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -34,6 +34,8 @@ import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.Jerseys; @@ -174,6 +176,8 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new LookupSerdeModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 0cd3730de43a..d18e12d3c7ae 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -40,6 +40,8 @@ import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.JacksonConfigProvider; @@ -337,6 +339,8 @@ private void configureOverlordHelpers(Binder binder) } }, new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new SupervisorModule(), new LookupSerdeModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index dfe5f1049304..71c3418e37dc 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -44,6 +44,8 @@ import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -258,6 +260,8 @@ public SegmentListerResource getSegmentListerResource( }, new QueryablePeonModule(), new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule(), new ChatHandlerServerModule(properties), new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java index 51eb8df06ef3..56753ed1b7ee 100644 --- a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java +++ b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java @@ -41,6 +41,8 @@ import org.apache.druid.guice.ExtensionsConfig; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; +import org.apache.druid.guice.IndexingServiceInputFormatModule; +import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; @@ -129,6 +131,8 @@ public void run() new FirehoseModule(), new IndexingHadoopModule(), new IndexingServiceFirehoseModule(), + new IndexingServiceInputSourceModule(), + new IndexingServiceInputFormatModule(), new LocalDataStorageDruidModule() ) ) From 0186fefaff4982b682acd8f8c99dfefbaa484bb5 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 2 Dec 2019 23:38:38 -0800 Subject: [PATCH 02/13] Inherit dims/metrics from segment --- .../indexing/input/DruidInputSource.java | 98 ++++++++++++++++++- .../input/DruidSegmentInputFormat.java | 16 ++- 2 files changed, 107 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index ea4d5c97937b..353f8d2a9a3f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -24,7 +24,11 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputEntity; @@ -63,9 +67,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; public class DruidInputSource extends AbstractInputSource implements SplittableInputSource> @@ -87,7 +94,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final RetryPolicyFactory retryPolicyFactory; - private final DruidSegmentInputFormat inputFormat; @JsonCreator public DruidInputSource( @@ -97,8 +103,8 @@ public DruidInputSource( // not for direct end user use. @JsonProperty("segments") @Nullable List segmentIds, @JsonProperty("filter") DimFilter dimFilter, - @JsonProperty("dimensions") List dimensions, - @JsonProperty("metrics") List metrics, + @Nullable @JsonProperty("dimensions") List dimensions, + @Nullable @JsonProperty("metrics") List metrics, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -119,7 +125,6 @@ public DruidInputSource( this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); - this.inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter); } @JsonProperty @@ -166,6 +171,8 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu { final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + final List> timeline = createTimeline(); + final Stream entityStream = createTimeline() .stream() .flatMap(holder -> { @@ -175,6 +182,27 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu .map(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); }); + final List effectiveDimensions; + if (dimensions == null) { + effectiveDimensions = getUniqueDimensions(timeline, null); + } else { + effectiveDimensions = dimensions; + } + + List effectiveMetrics; + if (metrics == null) { + effectiveMetrics = getUniqueMetrics(timeline); + } else { + effectiveMetrics = metrics; + } + + final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat( + indexIO, + dimFilter, + effectiveDimensions, + effectiveMetrics + ); + return new InputEntityIteratingReader( inputRowSchema, inputFormat, @@ -429,4 +457,66 @@ private static long jitter(long input) long retval = input + (long) jitter; return retval < 0 ? 0 : retval; } + + @VisibleForTesting + private static List getUniqueDimensions( + List> timelineSegments, + @Nullable Set excludeDimensions + ) + { + final BiMap uniqueDims = HashBiMap.create(); + + // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be + // optimized for performance. + // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more + // frequently, and thus the performance should be optimized for recent ones rather than old ones. + + // timelineSegments are sorted in order of interval + int index = 0; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String dimension : chunk.getObject().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); + } + } + } + } + + final BiMap orderedDims = uniqueDims.inverse(); + return IntStream.range(0, orderedDims.size()) + .mapToObj(orderedDims::get) + .collect(Collectors.toList()); + } + + @VisibleForTesting + private static List getUniqueMetrics(List> timelineSegments) + { + final BiMap uniqueMetrics = HashBiMap.create(); + + // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent + // segments to olders. + + // timelineSegments are sorted in order of interval + int[] index = {0}; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String metric : chunk.getObject().getMetrics()) { + uniqueMetrics.computeIfAbsent( + metric, + k -> { + return index[0]++; + } + ); + } + } + } + + final BiMap orderedMetrics = uniqueMetrics.inverse(); + return IntStream.range(0, orderedMetrics.size()) + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); + + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java index 49e826ecc289..80f87721357c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java @@ -27,16 +27,26 @@ import org.apache.druid.segment.IndexIO; import java.io.File; +import java.util.List; public class DruidSegmentInputFormat implements InputFormat { private final IndexIO indexIO; private final DimFilter dimFilter; + private List dimensions; + private List metrics; - DruidSegmentInputFormat(IndexIO indexIO, DimFilter dimFilter) + DruidSegmentInputFormat( + IndexIO indexIO, + DimFilter dimFilter, + List dimensions, + List metrics + ) { this.indexIO = indexIO; this.dimFilter = dimFilter; + this.dimensions = dimensions; + this.metrics = metrics; } @Override @@ -55,8 +65,8 @@ public InputEntityReader createReader( return new DruidSegmentReader( source, indexIO, - inputRowSchema.getDimensionsSpec().getDimensionNames(), - inputRowSchema.getMetricNames(), + dimensions, + metrics, dimFilter, temporaryDirectory ); From 5e20ef5d34b714a353ac382c681d7386c19623a6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 00:18:08 -0800 Subject: [PATCH 03/13] Add ingest segment firehose reindexing test --- .../common/task/CompactionTaskRunTest.java | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index b1a15d37cdb8..7799ab52f96a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -42,12 +43,16 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask.Builder; +import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -59,6 +64,7 @@ import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; @@ -88,6 +94,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -638,6 +645,93 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime2() throws Exceptio Assert.assertEquals(TaskState.FAILED, compactionResult.lhs.getStatusCode()); } + /** + * Run a regular index task that's equivalent to the compaction task in {@link #testRun()}, using + * {@link IngestSegmentFirehoseFactory}. + * + * This is not entirely CompactionTask related, but it's similar conceptually and it requires + * similar setup to what this test suite already has. + * + * It could be moved to a separate test class if needed. + */ + @Test + public void testRunRegularIndexTaskWithIngestSegmentFirehose() throws Exception + { + runIndexTask(); + + IndexTask indexTask = new IndexTask( + null, + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + getObjectMapper().convertValue( + new StringInputRowParser( + DEFAULT_PARSE_SPEC, + null + ), + Map.class + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + null, + getObjectMapper() + ), + new IndexTask.IndexIOConfig( + new IngestSegmentFirehoseFactory( + DATA_SOURCE, + Intervals.of("2014-01-01/2014-01-02"), + null, + null, + null, + null, + null, + getIndexIO(), + coordinatorClient, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ), + false + ), + IndexTaskTest.createTuningConfig(5000000, null, null, Long.MAX_VALUE, null, null, false, true) + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + appenderatorsManager + ); + + final Pair> resultPair = runTask(indexTask); + + Assert.assertTrue(resultPair.lhs.isSuccess()); + + final List segments = resultPair.rhs; + Assert.assertEquals(3, segments.size()); + + for (int i = 0; i < 3; i++) { + Assert.assertEquals( + Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), + segments.get(i).getInterval() + ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + if (lockGranularity == LockGranularity.SEGMENT) { + Assert.assertEquals( + new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), + segments.get(i).getShardSpec() + ); + } else { + Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + } + } + } + private Pair> runIndexTask() throws Exception { return runIndexTask(null, null, false); From 11d263dd13362798b14a28a7622aae538ed50073 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 00:23:28 -0800 Subject: [PATCH 04/13] Remove unnecessary module --- .../IndexingServiceInputFormatModule.java | 49 ------------------- .../testing/guice/DruidTestModuleFactory.java | 4 +- .../java/org/apache/druid/cli/CliIndexer.java | 2 - .../apache/druid/cli/CliMiddleManager.java | 2 - .../org/apache/druid/cli/CliOverlord.java | 2 - .../java/org/apache/druid/cli/CliPeon.java | 2 - .../cli/validate/DruidJsonValidator.java | 2 - 7 files changed, 1 insertion(+), 62 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java deleted file mode 100644 index 81b2287fe1d9..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceInputFormatModule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.guice; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import org.apache.druid.indexing.input.DruidSegmentInputFormat; -import org.apache.druid.initialization.DruidModule; - -import java.util.List; - -public class IndexingServiceInputFormatModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("IndexingServiceInputFormatModule") - .registerSubtypes( - new NamedType(DruidSegmentInputFormat.class, "druidSegment") - ) - ); - } - - @Override - public void configure(Binder binder) - { - } -} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java index 24d64165af3b..e082ffc9f66f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java @@ -24,7 +24,6 @@ import com.google.inject.Module; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.initialization.Initialization; import org.testng.IModuleFactory; @@ -51,8 +50,7 @@ private static List getModules() return ImmutableList.of( new DruidTestModule(), new IndexingServiceFirehoseModule(), - new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule() + new IndexingServiceInputSourceModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 1a29ab82d418..01d53f18b2c4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -34,7 +34,6 @@ import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; @@ -204,7 +203,6 @@ public DataNodeService getDataNodeService() }, new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 0aecdcf90d20..88646ddee6db 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -34,7 +34,6 @@ import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; @@ -177,7 +176,6 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) }, new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new LookupSerdeModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index d18e12d3c7ae..23e985e01dac 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -40,7 +40,6 @@ import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; @@ -340,7 +339,6 @@ private void configureOverlordHelpers(Binder binder) }, new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule(), new IndexingServiceTaskLogsModule(), new SupervisorModule(), new LookupSerdeModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 71c3418e37dc..a075e31201d1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -44,7 +44,6 @@ import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; @@ -261,7 +260,6 @@ public SegmentListerResource getSegmentListerResource( new QueryablePeonModule(), new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule(), new ChatHandlerServerModule(properties), new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java index 56753ed1b7ee..1d0eb1758957 100644 --- a/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java +++ b/services/src/main/java/org/apache/druid/cli/validate/DruidJsonValidator.java @@ -41,7 +41,6 @@ import org.apache.druid.guice.ExtensionsConfig; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; -import org.apache.druid.guice.IndexingServiceInputFormatModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.guice.QueryRunnerFactoryModule; @@ -132,7 +131,6 @@ public void run() new IndexingHadoopModule(), new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), - new IndexingServiceInputFormatModule(), new LocalDataStorageDruidModule() ) ) From 1d66b54b6719eb718e24880b298a27b0886ca68b Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 16:42:53 -0800 Subject: [PATCH 05/13] Fix unit tests, checkstyle --- .../druid/indexing/firehose/IngestSegmentFirehoseFactory.java | 2 +- .../druid/indexing/common/task/CompactionTaskRunTest.java | 1 - .../druid/indexing/common/task/IndexIngestionSpecTest.java | 2 +- .../indexing/firehose/IngestSegmentFirehoseFactoryTest.java | 2 ++ 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 6549fe53452a..4b00b6d3add7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -298,7 +298,7 @@ private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) retryPolicyFactory, dataSource, interval, - splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec + splitHintSpec == null ? new SegmentsSplitHintSpec(maxInputSegmentBytesPerTask) : splitHintSpec ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 7799ab52f96a..76ba995a0540 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -64,7 +64,6 @@ import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; -import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java index c700ad361321..af67f9908f51 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java @@ -45,7 +45,7 @@ public void testParserAndInputFormat() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage( - "At most one of [Property{name='parser', value={fake=parser map}}, Property{name='inputFormat'," + "Cannot use parser and inputSource together. Try using inputFormat instead of parser." ); final IndexIngestionSpec spec = new IndexIngestionSpec( new DataSchema( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index a50e2a96988b..6ab7caeb4b1c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -63,6 +63,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -308,6 +309,7 @@ public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMap public void configure(Binder binder) { binder.bind(LocalDataSegmentPuller.class); + binder.bind(ExprMacroTable.class).toInstance(TestExprMacroTable.INSTANCE); } } ) From 945978238577d397bd70ce5a26ad660ce09edc6d Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 16:53:09 -0800 Subject: [PATCH 06/13] Add doc entry --- docs/ingestion/native-batch.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 03d298f5e7b8..1661927689d7 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -890,3 +890,31 @@ This Firehose can be used to combine and merge data from a list of different Fir |--------|-----------|---------| |type|This should be "combining"|yes| |delegates|List of Firehoses to combine data from|yes| + + +## Input Sources + +### DruidInputSource + +This InputSource can be used to read data from existing Druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment. +This InputSource is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task). +This InputSource has a fixed InputFormat for reading from Druid segments; no InputFormat needs to be specified in the ingestion spec when using this InputSource. + +A sample DruidInputSource spec is shown below: + +```json +{ + "type": "druid", + "dataSource": "wikipedia", + "interval": "2013-01-01/2013-01-02" +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "druid".|yes| +|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes| +|interval|A String representing the ISO-8601 interval. This defines the time range to fetch the data over.|yes| +|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| +|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| +|filter| See [Filters](../querying/filters.md)|no| From b6a6b4840f944d8a0e1cb15a1fba14ba4054b3ac Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 19:28:08 -0800 Subject: [PATCH 07/13] Fix dimensionExclusions handling, add parallel index integration test --- .../indexing/input/DruidInputSource.java | 4 +- .../tests/indexer/ITParallelIndexTest.java | 23 ++++++- ...arallel_druid_input_source_index_task.json | 63 +++++++++++++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 353f8d2a9a3f..1e741ecc5d89 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -184,7 +184,9 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu final List effectiveDimensions; if (dimensions == null) { - effectiveDimensions = getUniqueDimensions(timeline, null); + effectiveDimensions = getUniqueDimensions(timeline, inputRowSchema.getDimensionsSpec().getDimensionExclusions()); + } else if (inputRowSchema.getDimensionsSpec().hasCustomDimensions()) { + effectiveDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); } else { effectiveDimensions = dimensions; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index be99de14933f..eaab3687ae10 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -44,6 +44,8 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest private static final String INDEX_DATASOURCE = "wikipedia_parallel_index_test"; private static final String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test"; private static final String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json"; + private static final String INDEX_DRUID_INPUT_SOURCE_DATASOURCE = "wikipedia_parallel_druid_input_source_index_test"; + private static final String INDEX_DRUID_INPUT_SOURCE_TASK = "/indexer/wikipedia_parallel_druid_input_source_index_task.json"; @DataProvider public static Object[][] resources() @@ -58,7 +60,8 @@ public static Object[][] resources() public void testIndexData(PartitionsSpec partitionsSpec) throws Exception { try (final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); - final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()) + final Closeable ignored2 = unloader(INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored3 = unloader(INDEX_DRUID_INPUT_SOURCE_DATASOURCE + config.getExtraDatasourceNameSuffix()) ) { boolean forceGuaranteedRollup = partitionsSpec.isForceGuaranteedRollupCompatible(); @@ -107,6 +110,15 @@ public void testIndexData(PartitionsSpec partitionsSpec) throws Exception INDEX_INGEST_SEGMENT_TASK, REINDEX_QUERIES_RESOURCE ); + + // with DruidInputSource instead of IngestSegmentFirehose + doReindexTest( + INDEX_DATASOURCE, + INDEX_DRUID_INPUT_SOURCE_DATASOURCE, + rollupTransform, + INDEX_DRUID_INPUT_SOURCE_TASK, + REINDEX_QUERIES_RESOURCE + ); } else { doReindexTest( INDEX_DATASOURCE, @@ -115,6 +127,15 @@ public void testIndexData(PartitionsSpec partitionsSpec) throws Exception INDEX_INGEST_SEGMENT_TASK, INDEX_QUERIES_RESOURCE ); + + // with DruidInputSource instead of IngestSegmentFirehose + doReindexTest( + INDEX_DATASOURCE, + INDEX_DRUID_INPUT_SOURCE_DATASOURCE, + rollupTransform, + INDEX_DRUID_INPUT_SOURCE_TASK, + INDEX_QUERIES_RESOURCE + ); } } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json new file mode 100644 index 000000000000..91702a413574 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json @@ -0,0 +1,63 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "dimensionsSpec": { + "dimensionExclusions": [ + "robot", + "continent" + ] + }, + "timestampSpec": { + "column": "timestamp" + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals": [ + "2013-08-31/2013-09-02" + ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "druid", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-02" + } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 10, + "forceGuaranteedRollup": "%%FORCE_GUARANTEED_ROLLUP%%", + "partitionsSpec": %%PARTITIONS_SPEC%%, + "splitHintSpec": { + "type":"segments", + "maxInputSegmentBytesPerTask": 1 + } + } + } +} From 9c5e1d801043d24d4022a59d2a6e13c40d4b62f9 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 3 Dec 2019 20:43:28 -0800 Subject: [PATCH 08/13] Add spelling exclusion --- website/.spelling | 2 ++ 1 file changed, 2 insertions(+) diff --git a/website/.spelling b/website/.spelling index 0be8bb4cf144..b52745261a53 100644 --- a/website/.spelling +++ b/website/.spelling @@ -940,6 +940,8 @@ forceGuaranteedRollup httpAuthenticationPassword httpAuthenticationUsername ingestSegment +InputSource +DruidInputSource maxInputSegmentBytesPerTask maxNumConcurrentSubTasks maxNumSegmentsToMerge From 32adef56ad525064e7d91b638335309e27842ba3 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 4 Dec 2019 19:52:30 -0800 Subject: [PATCH 09/13] Address some PR comments --- .../timeline/VersionedIntervalTimeline.java | 63 +++++ docs/ingestion/native-batch.md | 44 +++- .../IngestSegmentFirehoseFactory.java | 66 +---- .../indexing/input/DruidInputSource.java | 83 +----- .../input/DruidSegmentInputEntity.java | 5 +- .../indexing/input/DruidSegmentReader.java | 239 +++++++++++------- .../IngestSegmentFirehoseFactoryTest.java | 5 +- 7 files changed, 267 insertions(+), 238 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 728bd1ae5d53..647efddc761b 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -22,7 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.UOE; @@ -51,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.StreamSupport; /** @@ -753,6 +757,65 @@ private List> lookup(Interval inte return retVal; } + @VisibleForTesting + public static List getUniqueDimensions( + List> timelineSegments, + @Nullable Set excludeDimensions + ) + { + final BiMap uniqueDims = HashBiMap.create(); + + // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be + // optimized for performance. + // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more + // frequently, and thus the performance should be optimized for recent ones rather than old ones. + + // timelineSegments are sorted in order of interval + int index = 0; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String dimension : chunk.getObject().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); + } + } + } + } + + final BiMap orderedDims = uniqueDims.inverse(); + return IntStream.range(0, orderedDims.size()) + .mapToObj(orderedDims::get) + .collect(Collectors.toList()); + } + + @VisibleForTesting + public static List getUniqueMetrics(List> timelineSegments) + { + final BiMap uniqueMetrics = HashBiMap.create(); + + // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent + // segments to olders. + + // timelineSegments are sorted in order of interval + int[] index = {0}; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String metric : chunk.getObject().getMetrics()) { + uniqueMetrics.computeIfAbsent(metric, k -> { + return index[0]++; + } + ); + } + } + } + + final BiMap orderedMetrics = uniqueMetrics.inverse(); + return IntStream.range(0, orderedMetrics.size()) + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); + } + public class TimelineEntry { private final Interval trueInterval; diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 1661927689d7..70f39f7f3807 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -900,7 +900,16 @@ This InputSource can be used to read data from existing Druid segments, potentia This InputSource is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task). This InputSource has a fixed InputFormat for reading from Druid segments; no InputFormat needs to be specified in the ingestion spec when using this InputSource. -A sample DruidInputSource spec is shown below: +|property|description|required?| +|--------|-----------|---------| +|type|This should be "druid".|yes| +|dataSource|A String defining the Druid datasource to fetch rows from|yes| +|interval|A String representing the ISO-8601 interval, which defines the time range to fetch the data over.|yes| +|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no| +|metrics|The list of Strings containg the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no| +|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no| + +A minimal example DruidInputSource spec is shown below: ```json { @@ -910,11 +919,28 @@ A sample DruidInputSource spec is shown below: } ``` -|property|description|required?| -|--------|-----------|---------| -|type|This should be "druid".|yes| -|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes| -|interval|A String representing the ISO-8601 interval. This defines the time range to fetch the data over.|yes| -|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| -|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| -|filter| See [Filters](../querying/filters.md)|no| +The spec above will read all existing dimension and metric columns from the `wikipedia` datasource, including all rows with a timestamp (the `__time` column) within the interval `2013-01-01/2013-01-02`. + +A spec that applies a filter and reads a subset of the original datasource's columns is shown below. + +```json +{ + "type": "druid", + "dataSource": "wikipedia", + "interval": "2013-01-01/2013-01-02", + "dimensions": [ + "page", + "user" + ], + "metrics": [ + "added" + ], + "filter": { + "type": "selector", + "dimension": "page", + "value": "Druid" + } +} +``` + +This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned. \ No newline at end of file diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 4b00b6d3add7..fd09998ba319 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -53,6 +53,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; @@ -225,13 +226,15 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); } else { - dims = getUniqueDimensions( + dims = VersionedIntervalTimeline.getUniqueDimensions( timeLineSegments, inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() ); } - final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + final List metricsList = metrics == null + ? VersionedIntervalTimeline.getUniqueMetrics(timeLineSegments) + : metrics; final List adapters = Lists.newArrayList( Iterables.concat( @@ -323,63 +326,4 @@ public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) initializeSplitsIfNeeded(splitHintSpec); return splits.size(); } - - @VisibleForTesting - static List getUniqueDimensions( - List> timelineSegments, - @Nullable Set excludeDimensions - ) - { - final BiMap uniqueDims = HashBiMap.create(); - - // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be - // optimized for performance. - // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more - // frequently, and thus the performance should be optimized for recent ones rather than old ones. - - // timelineSegments are sorted in order of interval - int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String dimension : chunk.getObject().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); - } - } - } - } - - final BiMap orderedDims = uniqueDims.inverse(); - return IntStream.range(0, orderedDims.size()) - .mapToObj(orderedDims::get) - .collect(Collectors.toList()); - } - - @VisibleForTesting - static List getUniqueMetrics(List> timelineSegments) - { - final BiMap uniqueMetrics = HashBiMap.create(); - - // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent - // segments to olders. - - // timelineSegments are sorted in order of interval - int[] index = {0}; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent(metric, k -> { - return index[0]++; - } - ); - } - } - } - - final BiMap orderedMetrics = uniqueMetrics.inverse(); - return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 1e741ecc5d89..adbd876d824e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -184,7 +184,10 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu final List effectiveDimensions; if (dimensions == null) { - effectiveDimensions = getUniqueDimensions(timeline, inputRowSchema.getDimensionsSpec().getDimensionExclusions()); + effectiveDimensions = VersionedIntervalTimeline.getUniqueDimensions( + timeline, + inputRowSchema.getDimensionsSpec().getDimensionExclusions() + ); } else if (inputRowSchema.getDimensionsSpec().hasCustomDimensions()) { effectiveDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); } else { @@ -193,7 +196,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu List effectiveMetrics; if (metrics == null) { - effectiveMetrics = getUniqueMetrics(timeline); + effectiveMetrics = VersionedIntervalTimeline.getUniqueMetrics(timeline); } else { effectiveMetrics = metrics; } @@ -438,14 +441,12 @@ public static List> getTimelineForSegm // same interval). Interval lastInterval = null; for (Interval interval : timeline.keySet()) { - if (lastInterval != null) { - if (interval.overlaps(lastInterval)) { - throw new IAE( - "Distinct intervals in input segments may not overlap: [%s] vs [%s]", - lastInterval, - interval - ); - } + if (lastInterval != null && interval.overlaps(lastInterval)) { + throw new IAE( + "Distinct intervals in input segments may not overlap: [%s] vs [%s]", + lastInterval, + interval + ); } lastInterval = interval; } @@ -459,66 +460,4 @@ private static long jitter(long input) long retval = input + (long) jitter; return retval < 0 ? 0 : retval; } - - @VisibleForTesting - private static List getUniqueDimensions( - List> timelineSegments, - @Nullable Set excludeDimensions - ) - { - final BiMap uniqueDims = HashBiMap.create(); - - // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be - // optimized for performance. - // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more - // frequently, and thus the performance should be optimized for recent ones rather than old ones. - - // timelineSegments are sorted in order of interval - int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String dimension : chunk.getObject().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); - } - } - } - } - - final BiMap orderedDims = uniqueDims.inverse(); - return IntStream.range(0, orderedDims.size()) - .mapToObj(orderedDims::get) - .collect(Collectors.toList()); - } - - @VisibleForTesting - private static List getUniqueMetrics(List> timelineSegments) - { - final BiMap uniqueMetrics = HashBiMap.create(); - - // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent - // segments to olders. - - // timelineSegments are sorted in order of interval - int[] index = {0}; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent( - metric, - k -> { - return index[0]++; - } - ); - } - } - } - - final BiMap orderedMetrics = uniqueMetrics.inverse(); - return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); - - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java index a7f39088f314..755086303d24 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; @@ -34,6 +35,8 @@ public class DruidSegmentInputEntity implements InputEntity { + private static final EmittingLogger log = new EmittingLogger(DruidSegmentInputEntity.class); + private final SegmentLoader segmentLoader; private final DataSegment segment; private final Interval intervalFilter; @@ -85,7 +88,7 @@ public File file() public void close() { if (!segmentFile.delete()) { - // log + log.warn("Could not clean temporary segment file: " + segmentFile); } } }; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 3147214d5b7c..0f4bef6b157a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -39,6 +39,7 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexStorageAdapter; @@ -58,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Consumer; public class DruidSegmentReader extends IntermediateRowParsingReader> { @@ -96,94 +98,67 @@ protected CloseableIterator> intermediateRowIterator() throw ), source.getIntervalFilter() ); + + final Sequence cursors = storageAdapter.getAdapter().makeCursors( + Filters.toFilter(dimFilter), + storageAdapter.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + final Sequence> sequence = Sequences.concat( Sequences.map( - storageAdapter.getAdapter().makeCursors( - Filters.toFilter(dimFilter), - storageAdapter.getInterval(), - VirtualColumns.EMPTY, - Granularities.ALL, - false, - null - ), - cursor -> { - final BaseLongColumnValueSelector timestampColumnSelector = - cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); - - final Map dimSelectors = new HashMap<>(); - for (String dim : dimensions) { - final DimensionSelector dimSelector = cursor - .getColumnSelectorFactory() - .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); - } - } - - final Map metSelectors = new HashMap<>(); - for (String metric : metrics) { - final BaseObjectColumnValueSelector metricSelector = - cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); - metSelectors.put(metric, metricSelector); - } - - return Sequences.simple( - () -> new Iterator>() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public Map next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.getLong(); - theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); - - for (Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); - - int valsSize = vals.size(); - if (valsSize == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else if (valsSize > 1) { - List dimVals = new ArrayList<>(valsSize); - for (int i = 0; i < valsSize; ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } - - for (Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final BaseObjectColumnValueSelector selector = metSelector.getValue(); - Object value = selector.getObject(); - if (value != null) { - theEvent.put(metric, value); - } - } - cursor.advance(); - return theEvent; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - } - ); - } + cursors, + this::cursorToSequence ) ); + + return makeCloseableIteratorFromSequenceAndSegmentFile(sequence, segmentFile); + } + + @Override + protected List parseInputRows(Map intermediateRow) throws ParseException + { + final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN); + return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow)); + } + + @Override + protected Map toMap(Map intermediateRow) + { + return intermediateRow; + } + + /** + * Given a {@link Cursor} create a {@link Sequence} that returns the rows of the cursor as + * Map intermediate rows, selecting the dimensions and metrics of this segment reader. + * + * @param cursor A cursor + * @return A sequence of intermediate rows + */ + private Sequence> cursorToSequence( + final Cursor cursor + ) + { + return Sequences.simple( + () -> new IntermediateRowFromCursorIterator(cursor, dimensions, metrics) + ); + } + + /** + * @param sequence A sequence of intermediate rows generated from a sequence of + * cursors in {@link #intermediateRowIterator()} + * @param segmentFile The underlying segment file containing the row data + * @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file + * when the iterator is closed. + */ + private static CloseableIterator> makeCloseableIteratorFromSequenceAndSegmentFile( + final Sequence> sequence, + final CleanableFile segmentFile + ) + { return new CloseableIterator>() { Yielder> rowYielder = Yielders.each(sequence); @@ -210,16 +185,94 @@ public void close() throws IOException }; } - @Override - protected List parseInputRows(Map intermediateRow) throws ParseException + /** + * Given a {@link Cursor}, a list of dimension names, and a list of metric names, this iterator + * returns the rows of the cursor as Map intermediate rows. + */ + private static class IntermediateRowFromCursorIterator implements Iterator> { - final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN); - return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow)); - } + private final Cursor cursor; + private final BaseLongColumnValueSelector timestampColumnSelector; + private final Map dimSelectors; + private final Map metSelectors; - @Override - protected Map toMap(Map intermediateRow) - { - return intermediateRow; + public IntermediateRowFromCursorIterator( + Cursor cursor, + List dimensionNames, + List metricNames + ) + { + this.cursor = cursor; + + timestampColumnSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); + + dimSelectors = new HashMap<>(); + for (String dim : dimensionNames) { + final DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + metSelectors = new HashMap<>(); + for (String metric : metricNames) { + final BaseObjectColumnValueSelector metricSelector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(metric); + metSelectors.put(metric, metricSelector); + } + } + + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public Map next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getLong(); + theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp)); + + for (Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + int valsSize = vals.size(); + if (valsSize == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else if (valsSize > 1) { + List dimVals = new ArrayList<>(valsSize); + for (int i = 0; i < valsSize; ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final BaseObjectColumnValueSelector selector = metSelector.getValue(); + Object value = selector.getObject(); + if (value != null) { + theEvent.put(metric, value); + } + } + cursor.advance(); + return theEvent; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 6ab7caeb4b1c..b589ffde8385 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -81,6 +81,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; @@ -600,11 +601,11 @@ public void testGetUniqueDimensionsAndMetrics() }; Assert.assertEquals( Arrays.asList(expectedDims), - IngestSegmentFirehoseFactory.getUniqueDimensions(timelineSegments, null) + VersionedIntervalTimeline.getUniqueDimensions(timelineSegments, null) ); Assert.assertEquals( Arrays.asList(expectedMetrics), - IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments) + VersionedIntervalTimeline.getUniqueMetrics(timelineSegments) ); } From 10940a021fd11fcc6f125eda352dfcf0eaa7a095 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 4 Dec 2019 19:54:30 -0800 Subject: [PATCH 10/13] Checkstyle --- .../apache/druid/timeline/VersionedIntervalTimeline.java | 8 +++++--- .../indexing/firehose/IngestSegmentFirehoseFactory.java | 6 ------ .../org/apache/druid/indexing/input/DruidInputSource.java | 7 ------- .../apache/druid/indexing/input/DruidSegmentReader.java | 1 - 4 files changed, 5 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 647efddc761b..c0b3d1f49073 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -802,9 +802,11 @@ public static List getUniqueMetrics(List timelineHolder : Lists.reverse(timelineSegments)) { for (PartitionChunk chunk : timelineHolder.getObject()) { for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent(metric, k -> { - return index[0]++; - } + uniqueMetrics.computeIfAbsent( + metric, + k -> { + return index[0]++; + } ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index fd09998ba319..d817d5e4e801 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -22,11 +22,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -62,9 +59,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index adbd876d824e..9c641ff33c71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -24,11 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputEntity; @@ -67,12 +63,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; public class DruidInputSource extends AbstractInputSource implements SplittableInputSource> diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 0f4bef6b157a..718b0841c1d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -59,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.function.Consumer; public class DruidSegmentReader extends IntermediateRowParsingReader> { From 7f31c0f06277c7e0d35229bab6ce2cc4cf1a8898 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 4 Dec 2019 21:37:42 -0800 Subject: [PATCH 11/13] wip --- .../common/task/CompactionTaskRunTest.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 76ba995a0540..6ad23bba4473 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -111,7 +111,7 @@ public class CompactionTaskRunTest extends IngestionTestBase public static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec("ts", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))), - null, + "|", Arrays.asList("ts", "dim", "val"), false, 0 @@ -130,6 +130,20 @@ public class CompactionTaskRunTest extends IngestionTestBase ) ); + private static List TEST_ROWS = new ArrayList<>(); + { + TEST_ROWS.add("2014-01-01T00:00:10Z,a,1\n"); + TEST_ROWS.add("2014-01-01T00:00:10Z,b,2\n"); + TEST_ROWS.add("2014-01-01T00:00:10Z,c,3\n"); + TEST_ROWS.add("2014-01-01T01:00:20Z,a,1\n"); + TEST_ROWS.add("2014-01-01T01:00:20Z,b,2\n"); + TEST_ROWS.add("2014-01-01T01:00:20Z,c,3\n"); + TEST_ROWS.add("2014-01-01T02:00:30Z,a,1\n"); + TEST_ROWS.add("2014-01-01T02:00:30Z,b,2\n"); + TEST_ROWS.add("2014-01-01T02:00:30Z,c,3\n"); + //TEST_ROWS.add("2014-01-01T02:00:30Z,c|d|e,3\n"); + } + @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() { @@ -751,15 +765,9 @@ private Pair> runIndexTask( File tmpFile = File.createTempFile("druid", "index", tmpDir); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("2014-01-01T00:00:10Z,a,1\n"); - writer.write("2014-01-01T00:00:10Z,b,2\n"); - writer.write("2014-01-01T00:00:10Z,c,3\n"); - writer.write("2014-01-01T01:00:20Z,a,1\n"); - writer.write("2014-01-01T01:00:20Z,b,2\n"); - writer.write("2014-01-01T01:00:20Z,c,3\n"); - writer.write("2014-01-01T02:00:30Z,a,1\n"); - writer.write("2014-01-01T02:00:30Z,b,2\n"); - writer.write("2014-01-01T02:00:30Z,c,3\n"); + for (String testRow : TEST_ROWS) { + writer.write(testRow); + } } IndexTask indexTask = new IndexTask( From 8b68ff95bd111e2329d49a62390cdb2238fe790a Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 4 Dec 2019 23:03:14 -0800 Subject: [PATCH 12/13] Address rest of PR comments --- .../indexing/input/DruidInputSource.java | 2 +- .../task/CompactionTaskParallelRunTest.java | 39 +++++++ .../common/task/CompactionTaskRunTest.java | 106 +++++++++++++++--- 3 files changed, 132 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 9c641ff33c71..a316f56e542e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -290,7 +290,7 @@ public static List>> createSplits( { final long maxInputSegmentBytesPerTask; if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { - LOG.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); + LOG.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ignoring it.", splitHintSpec); maxInputSegmentBytesPerTask = new SegmentsSplitHintSpec(null).getMaxInputSegmentBytesPerTask(); } else { maxInputSegmentBytesPerTask = ((SegmentsSplitHintSpec) splitHintSpec).getMaxInputSegmentBytesPerTask(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 6217dc1fa187..98bb64bdb511 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -25,6 +25,8 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.LockGranularity; @@ -38,6 +40,8 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -65,9 +69,12 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; @RunWith(Parameterized.class) public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest @@ -166,6 +173,38 @@ public void testRunParallel() throws Exception runTask(compactionTask); } + @Test + public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Exception + { + runIndexTask(); + + Interval interval = Intervals.of("2014-01-01/2014-01-02"); + + List>> splits = DruidInputSource.createSplits( + coordinatorClient, + RETRY_POLICY_FACTORY, + DATA_SOURCE, + interval, + new SegmentsSplitHintSpec(1L) // each segment gets its own split with this config + ); + + List segments = new ArrayList<>( + coordinatorClient.getDatabaseSegmentDataSourceSegments( + DATA_SOURCE, + ImmutableList.of(interval) + ) + ); + + Set segmentIdsFromSplits = new HashSet<>(); + Set segmentIdsFromCoordinator = new HashSet<>(); + Assert.assertEquals(segments.size(), splits.size()); + for (int i = 0; i < segments.size(); i++) { + segmentIdsFromCoordinator.add(segments.get(i).getId().toString()); + segmentIdsFromSplits.add(splits.get(i).get().get(0).getSegmentId()); + } + Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits); + } + private void runIndexTask() throws Exception { File tmpDir = temporaryFolder.newFolder(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 6ad23bba4473..3a18cac2b589 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -48,10 +48,17 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.LocalDataSegmentPuller; @@ -65,6 +72,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -130,19 +138,18 @@ public class CompactionTaskRunTest extends IngestionTestBase ) ); - private static List TEST_ROWS = new ArrayList<>(); - { - TEST_ROWS.add("2014-01-01T00:00:10Z,a,1\n"); - TEST_ROWS.add("2014-01-01T00:00:10Z,b,2\n"); - TEST_ROWS.add("2014-01-01T00:00:10Z,c,3\n"); - TEST_ROWS.add("2014-01-01T01:00:20Z,a,1\n"); - TEST_ROWS.add("2014-01-01T01:00:20Z,b,2\n"); - TEST_ROWS.add("2014-01-01T01:00:20Z,c,3\n"); - TEST_ROWS.add("2014-01-01T02:00:30Z,a,1\n"); - TEST_ROWS.add("2014-01-01T02:00:30Z,b,2\n"); - TEST_ROWS.add("2014-01-01T02:00:30Z,c,3\n"); - //TEST_ROWS.add("2014-01-01T02:00:30Z,c|d|e,3\n"); - } + private static final List TEST_ROWS = ImmutableList.of( + "2014-01-01T00:00:10Z,a,1\n", + "2014-01-01T00:00:10Z,b,2\n", + "2014-01-01T00:00:10Z,c,3\n", + "2014-01-01T01:00:20Z,a,1\n", + "2014-01-01T01:00:20Z,b,2\n", + "2014-01-01T01:00:20Z,c,3\n", + "2014-01-01T02:00:30Z,a,1\n", + "2014-01-01T02:00:30Z,b,2\n", + "2014-01-01T02:00:30Z,c,3\n", + "2014-01-01T02:00:30Z,c|d|e,3\n" + ); @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() @@ -161,13 +168,14 @@ public static Iterable constructorFeeder() private final SegmentLoaderFactory segmentLoaderFactory; private final LockGranularity lockGranularity; private final AppenderatorsManager appenderatorsManager; + private final TestUtils testUtils; private ExecutorService exec; private File localDeepStorage; public CompactionTaskRunTest(LockGranularity lockGranularity) { - TestUtils testUtils = new TestUtils(); + testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); indexingServiceClient = new NoopIndexingServiceClient(); coordinatorClient = new CoordinatorClient(null, null) @@ -241,6 +249,9 @@ public void testRun() throws Exception Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); } } + + List rowsFromSegment = getCSVFormatRowsFromSegments(segments); + Assert.assertEquals(TEST_ROWS, rowsFromSegment); } @Test @@ -883,4 +894,71 @@ public List getLocations() null ); } + + private List getCSVFormatRowsFromSegments(List segments) throws Exception + { + + final File cacheDir = temporaryFolder.newFolder(); + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(cacheDir); + + List cursors = new ArrayList<>(); + for (DataSegment segment : segments) { + final File segmentFile = segmentLoader.getSegmentFiles(segment); + + final WindowedStorageAdapter adapter = new WindowedStorageAdapter( + new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)), + segment.getInterval() + ); + final Sequence cursorSequence = adapter.getAdapter().makeCursors( + null, + segment.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + cursors.addAll(cursorSequence.toList()); + } + + List rowsFromSegment = new ArrayList<>(); + for (Cursor cursor : cursors) { + cursor.reset(); + while (!cursor.isDone()) { + final DimensionSelector selector1 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("ts", "ts")); + final DimensionSelector selector2 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("dim", "dim")); + final DimensionSelector selector3 = cursor.getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("val", "val")); + + Object dimObject = selector2.getObject(); + String dimVal = null; + if (dimObject instanceof String) { + dimVal = (String) dimObject; + } else if (dimObject instanceof List) { + dimVal = String.join("|", (List) dimObject); + } + + rowsFromSegment.add( + makeCSVFormatRow( + selector1.getObject().toString(), + dimVal, + selector3.defaultGetObject().toString() + ) + ); + + cursor.advance(); + } + } + return rowsFromSegment; + } + + private static String makeCSVFormatRow( + String ts, + String dim, + String val + ) + { + return StringUtils.format("%s,%s,%s\n", ts, dim, val); + } } From 65c0d0d906d94c1962c528114b61a7a6a3324d42 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 5 Dec 2019 14:06:47 -0800 Subject: [PATCH 13/13] Address PR comments --- .../timeline/VersionedIntervalTimeline.java | 65 ----------- docs/ingestion/native-batch.md | 4 +- .../common/ReingestionTimelineUtils.java | 104 ++++++++++++++++++ .../IngestSegmentFirehoseFactory.java | 6 +- .../indexing/input/DruidInputSource.java | 5 +- .../indexing/input/DruidSegmentReader.java | 6 - .../IngestSegmentFirehoseFactoryTest.java | 6 +- 7 files changed, 115 insertions(+), 81 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index c0b3d1f49073..728bd1ae5d53 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -22,10 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.UOE; @@ -54,7 +51,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.StreamSupport; /** @@ -757,67 +753,6 @@ private List> lookup(Interval inte return retVal; } - @VisibleForTesting - public static List getUniqueDimensions( - List> timelineSegments, - @Nullable Set excludeDimensions - ) - { - final BiMap uniqueDims = HashBiMap.create(); - - // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be - // optimized for performance. - // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more - // frequently, and thus the performance should be optimized for recent ones rather than old ones. - - // timelineSegments are sorted in order of interval - int index = 0; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String dimension : chunk.getObject().getDimensions()) { - if (!uniqueDims.containsKey(dimension) && - (excludeDimensions == null || !excludeDimensions.contains(dimension))) { - uniqueDims.put(dimension, index++); - } - } - } - } - - final BiMap orderedDims = uniqueDims.inverse(); - return IntStream.range(0, orderedDims.size()) - .mapToObj(orderedDims::get) - .collect(Collectors.toList()); - } - - @VisibleForTesting - public static List getUniqueMetrics(List> timelineSegments) - { - final BiMap uniqueMetrics = HashBiMap.create(); - - // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent - // segments to olders. - - // timelineSegments are sorted in order of interval - int[] index = {0}; - for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { - for (PartitionChunk chunk : timelineHolder.getObject()) { - for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent( - metric, - k -> { - return index[0]++; - } - ); - } - } - } - - final BiMap orderedMetrics = uniqueMetrics.inverse(); - return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); - } - public class TimelineEntry { private final Interval trueInterval; diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 70f39f7f3807..8deafeeea71d 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -904,9 +904,9 @@ This InputSource has a fixed InputFormat for reading from Druid segments; no Inp |--------|-----------|---------| |type|This should be "druid".|yes| |dataSource|A String defining the Druid datasource to fetch rows from|yes| -|interval|A String representing the ISO-8601 interval, which defines the time range to fetch the data over.|yes| +|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes| |dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no| -|metrics|The list of Strings containg the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no| +|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no| |filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no| A minimal example DruidInputSource spec is shown below: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java new file mode 100644 index 000000000000..1f4820f39427 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.partition.PartitionChunk; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ReingestionTimelineUtils +{ + /** + * @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup(). + * @param excludeDimensions Dimensions to be excluded + * @return A list of all the unique dimension column names present in the segments within timelineSegments + */ + public static List getUniqueDimensions( + List> timelineSegments, + @Nullable Set excludeDimensions + ) + { + final BiMap uniqueDims = HashBiMap.create(); + + // Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be + // optimized for performance. + // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more + // frequently, and thus the performance should be optimized for recent ones rather than old ones. + + // timelineSegments are sorted in order of interval + int index = 0; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String dimension : chunk.getObject().getDimensions()) { + if (!uniqueDims.containsKey(dimension) && + (excludeDimensions == null || !excludeDimensions.contains(dimension))) { + uniqueDims.put(dimension, index++); + } + } + } + } + + final BiMap orderedDims = uniqueDims.inverse(); + return IntStream.range(0, orderedDims.size()) + .mapToObj(orderedDims::get) + .collect(Collectors.toList()); + } + + /** + * @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup(). + * @return A list of all the unique metric column names present in the segments within timelineSegments + */ + public static List getUniqueMetrics(List> timelineSegments) + { + final BiMap uniqueMetrics = HashBiMap.create(); + + // Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent + // segments to olders. + + // timelineSegments are sorted in order of interval + int[] index = {0}; + for (TimelineObjectHolder timelineHolder : Lists.reverse(timelineSegments)) { + for (PartitionChunk chunk : timelineHolder.getObject()) { + for (String metric : chunk.getObject().getMetrics()) { + uniqueMetrics.computeIfAbsent( + metric, + k -> { + return index[0]++; + } + ); + } + } + } + + final BiMap orderedMetrics = uniqueMetrics.inverse(); + return IntStream.range(0, orderedMetrics.size()) + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index d817d5e4e801..5ae5d2b5919e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.input.DruidInputSource; @@ -50,7 +51,6 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; @@ -220,14 +220,14 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); } else { - dims = VersionedIntervalTimeline.getUniqueDimensions( + dims = ReingestionTimelineUtils.getUniqueDimensions( timeLineSegments, inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() ); } final List metricsList = metrics == null - ? VersionedIntervalTimeline.getUniqueMetrics(timeLineSegments) + ? ReingestionTimelineUtils.getUniqueMetrics(timeLineSegments) : metrics; final List adapters = Lists.newArrayList( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index a316f56e542e..4d4d0f30ad16 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -36,6 +36,7 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -177,7 +178,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu final List effectiveDimensions; if (dimensions == null) { - effectiveDimensions = VersionedIntervalTimeline.getUniqueDimensions( + effectiveDimensions = ReingestionTimelineUtils.getUniqueDimensions( timeline, inputRowSchema.getDimensionsSpec().getDimensionExclusions() ); @@ -189,7 +190,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu List effectiveMetrics; if (metrics == null) { - effectiveMetrics = VersionedIntervalTimeline.getUniqueMetrics(timeline); + effectiveMetrics = ReingestionTimelineUtils.getUniqueMetrics(timeline); } else { effectiveMetrics = metrics; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 718b0841c1d3..9b5f3a5c563e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -267,11 +267,5 @@ public Map next() cursor.advance(); return theEvent; } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index b589ffde8385..d4e40781016e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -41,6 +41,7 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -81,7 +82,6 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; @@ -601,11 +601,11 @@ public void testGetUniqueDimensionsAndMetrics() }; Assert.assertEquals( Arrays.asList(expectedDims), - VersionedIntervalTimeline.getUniqueDimensions(timelineSegments, null) + ReingestionTimelineUtils.getUniqueDimensions(timelineSegments, null) ); Assert.assertEquals( Arrays.asList(expectedMetrics), - VersionedIntervalTimeline.getUniqueMetrics(timelineSegments) + ReingestionTimelineUtils.getUniqueMetrics(timelineSegments) ); }