Skip to content
54 changes: 54 additions & 0 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,57 @@ This Firehose can be used to combine and merge data from a list of different Fir
|--------|-----------|---------|
|type|This should be "combining"|yes|
|delegates|List of Firehoses to combine data from|yes|


## Input Sources

### DruidInputSource

This InputSource can be used to read data from existing Druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment.
This InputSource is _splittable_ and can be used by [native parallel index tasks](native-batch.md#parallel-task).
This InputSource has a fixed InputFormat for reading from Druid segments; no InputFormat needs to be specified in the ingestion spec when using this InputSource.

|property|description|required?|
|--------|-----------|---------|
|type|This should be "druid".|yes|
|dataSource|A String defining the Druid datasource to fetch rows from|yes|
|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes|
|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no|
|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no|
|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no|

A minimal example DruidInputSource spec is shown below:

```json
{
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be nice if either this example or a 2nd example included dimensions, metrics, and filter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a second example with more explanation of what the example specs do

}
```

The spec above will read all existing dimension and metric columns from the `wikipedia` datasource, including all rows with a timestamp (the `__time` column) within the interval `2013-01-01/2013-01-02`.

A spec that applies a filter and reads a subset of the original datasource's columns is shown below.

```json
{
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02",
"dimensions": [
"page",
"user"
],
"metrics": [
"added"
],
"filter": {
"type": "selector",
"dimension": "page",
"value": "Druid"
}
}
```

This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.initialization.DruidModule;

import java.util.List;

public class IndexingServiceInputSourceModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("IndexingServiceInputSourceModule")
.registerSubtypes(
new NamedType(DruidInputSource.class, "druid")
)
);
}

@Override
public void configure(Binder binder)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ReingestionTimelineUtils
{
/**
* @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup().
* @param excludeDimensions Dimensions to be excluded
* @return A list of all the unique dimension column names present in the segments within timelineSegments
*/
public static List<String> getUniqueDimensions(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
@Nullable Set<String> excludeDimensions
)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();

// Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be
// optimized for performance.
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.

// timelineSegments are sorted in order of interval
int index = 0;
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String dimension : chunk.getObject().getDimensions()) {
if (!uniqueDims.containsKey(dimension) &&
(excludeDimensions == null || !excludeDimensions.contains(dimension))) {
uniqueDims.put(dimension, index++);
}
}
}
}

final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
return IntStream.range(0, orderedDims.size())
.mapToObj(orderedDims::get)
.collect(Collectors.toList());
}

/**
* @param timelineSegments A list of timeline objects, such as that returned by VersionedIntervalTimeline.lookup().
* @return A list of all the unique metric column names present in the segments within timelineSegments
*/
public static List<String> getUniqueMetrics(List<TimelineObjectHolder<String, DataSegment>> timelineSegments)
{
final BiMap<String, Integer> uniqueMetrics = HashBiMap.create();

// Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent
// segments to olders.

// timelineSegments are sorted in order of interval
int[] index = {0};
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String metric : chunk.getObject().getMetrics()) {
uniqueMetrics.computeIfAbsent(
metric,
k -> {
return index[0]++;
}
);
}
}
}

final BiMap<Integer, String> orderedMetrics = uniqueMetrics.inverse();
return IntStream.range(0, orderedMetrics.size())
.mapToObj(orderedMetrics::get)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -57,7 +55,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
Expand All @@ -68,7 +66,6 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
Expand Down Expand Up @@ -529,20 +526,20 @@ private static ParallelIndexIOConfig createIoConfig(
)
{
return new ParallelIndexIOConfig(
new IngestSegmentFirehoseFactory(
null,
new DruidInputSource(
dataSchema.getDataSource(),
interval,
null,
null, // no filter
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
null,
dataSchema.getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
null,
false
);
}
Expand Down Expand Up @@ -603,15 +600,14 @@ private static DataSchema createDataSchema(
final AggregatorFactory[] finalMetricsSpec = metricsSpec == null
? createMetricsSpec(queryableIndexAndSegments)
: convertToCombiningFactories(metricsSpec);
final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec));

return new DataSchema(
dataSource,
jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT),
new TimestampSpec(null, null, null),
finalDimensionsSpec,
finalMetricsSpec,
granularitySpec,
null,
jsonMapper
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSp
ingestionSchema.getDataSchema().getDimensionsSpec(),
metricsNames
),
getInputFormat(ingestionSchema),
inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
tmpDir
)
);
Expand Down Expand Up @@ -919,7 +919,7 @@ private TaskStatus generateAndPublishSegments(
driver,
partitionsSpec,
inputSource,
getInputFormat(ingestionSchema),
inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null,
tmpDir,
segmentAllocator
);
Expand Down Expand Up @@ -1037,15 +1037,17 @@ public IndexIngestionSpec(
{
super(dataSchema, ioConfig, tuningConfig);

Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("parser", dataSchema.getParserMap()),
new Property<>("inputFormat", ioConfig.getInputFormat())
)
);
if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) {
throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser.");
}
if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) {
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("parser", dataSchema.getParserMap()),
new Property<>("inputFormat", ioConfig.getInputFormat())
)
);
}

this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public SegmentsAndMetadata process(
BatchAppenderatorDriver driver,
PartitionsSpec partitionsSpec,
InputSource inputSource,
InputFormat inputFormat,
@Nullable InputFormat inputFormat,
File tmpDir,
IndexTaskSegmentAllocator segmentAllocator
) throws IOException, InterruptedException, ExecutionException, TimeoutException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,19 @@ public ParallelIndexIngestionSpec(
{
super(dataSchema, ioConfig, tuningConfig);

Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("parser", dataSchema.getParserMap()),
new Property<>("inputFormat", ioConfig.getInputFormat())
)
);
if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) {
if (!(ioConfig.getInputSource() instanceof FirehoseFactoryToInputSourceAdaptor)) {
throw new IAE("Cannot use parser and inputSource together. Try using inputFormat instead of parser.");
}
}
if (ioConfig.getInputSource() != null && ioConfig.getInputSource().needsFormat()) {
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("parser", dataSchema.getParserMap()),
new Property<>("inputFormat", ioConfig.getInputFormat())
)
);
}

this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private List<DataSegment> generateSegments(
driver,
partitionsSpec,
inputSource,
ParallelIndexSupervisorTask.getInputFormat(ingestionSchema),
inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
tmpDir,
segmentAllocator
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private Set<DataSegment> generateAndPushSegments(
ingestionSchema.getDataSchema().getDimensionsSpec(),
metricsNames
),
ParallelIndexSupervisorTask.getInputFormat(ingestionSchema),
inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
tmpDir
)
);
Expand Down
Loading