Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ There are multiple types of inputSpecs:

##### `static`

Is a type of data loader where a static path to where the data files are located is passed.
Is a type of inputSpec where a static path to where the data files are located is passed.

|Field|Type|Description|Required|
|-----|----|-----------|--------|
Expand All @@ -150,7 +150,7 @@ For example, using the static input paths:

##### `granularity`

Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).
Is a type of inputSpec that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).

|Field|Type|Description|Required|
|-----|----|-----------|--------|
Expand All @@ -166,6 +166,61 @@ s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01
...
s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23
```
##### `dataSource`

It is a type of inputSpec that reads data already stored inside druid. It is useful for doing "re-indexing". A usecase would be that you ingested some data in some interval and at a later time you wanted to change granularity of rows or remove some columns from the data stored in druid.

|Field|Type|Description|Required|
|-----|----|-----------|--------|
|ingestionSpec|Json Object|Specification of druid segments to be loaded. See below.|yes|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no|

Here is what goes inside "ingestionSpec"
|Field|Type|Description|Required|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|interval|String|A string representing ISO-8601 Intervals.|yes|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none".See [Granularities](../querying/granularities.html).|no|
|filter|Json|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|


For example

```
"ingestionSpec" :
{
"dataSource": "wikipedia",
"interval": "2014-10-20T00:00:00Z/P2W"
}
```

##### `multi`

It is a composing inputSpec to combine two other input specs. It is useful for doing "delta ingestion". A usecase would be that you ingested some data in some interval and at a later time you wanted to "append" more data to that interval. You can use this inputSpec to combine `dataSource` and `static` (or others) input specs to add more data to an already indexed interval.

|Field|Type|Description|Required|
|-----|----|-----------|--------|
|children|Array of Json Objects|List of json objects containing other inputSpecs |yes|

For example

```
"children": [
{
"type" : "dataSource",
"ingestionSpec" : {
"dataSource": "wikipedia",
"interval": "2014-10-20T00:00:00Z/P2W"
}
},
{
"type" : "static",
"paths": "/path/to/more/wikipedia/data/"
}
]
```


#### Metadata Update Job Spec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public boolean run()
);

JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public boolean run()
);

JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
Expand Down Expand Up @@ -173,7 +172,6 @@ public boolean run()
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
JobHelper.setInputFormat(dimSelectionJob, config);
config.addInputPaths(dimSelectionJob);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.druid.indexer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -55,7 +56,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -248,6 +248,12 @@ public HadoopIngestionSpec getSchema()
return schema;
}

@JsonIgnore
public PathSpec getPathSpec()
{
return pathSpec;
}

public String getDataSource()
{
return schema.getDataSchema().getDataSource();
Expand Down Expand Up @@ -354,11 +360,6 @@ public Job addInputPaths(Job job) throws IOException
return pathSpec.addInputPaths(this, job);
}

public Class<? extends InputFormat> getInputFormatClass()
{
return pathSpec.getInputFormat();
}

/********************************************
Granularity/Bucket Helper Methods
********************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public final static InputRow parseInputRow(Object value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
return ((StringInputRowParser)parser).parse(value.toString());
return ((StringInputRowParser) parser).parse(value.toString());
} else if(value instanceof InputRow) {
return (InputRow)value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can metrics be delta-ingested when the combining aggregator is not the same as the normal aggregator? Wondering because input rows from segments seem to be treated the same as input rows from raw data here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, that sounds true. it gets a lil weird when "name" and "fieldName" in the aggregator are not same. I will need to think how to treat rows read from segment differently.
also it seems DatasourcePathSpec should really get list of metrics from "name" and not "fieldName" .

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main complication is in serializing the input rows.
InputRow from raw data will have "fieldName" columns in it and same from segment will have "name" columns in it, so they have to be treated differently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is that DatasourceInputFormat returns a "SegmentInputRow" instead of "InputRow" which would only be a wrapper(and extension of InputRow) and we put ugly things like

if(row instanceof SegmentInputRow) {
  //serialize using combining aggregators
} else {
  //serialize using normal aggregators
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheddar what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't full thought this through, but is there any harm in having the SegmentInputFormat know about the Aggregators such that it can return the "name" value when it is asked for "fieldName" as well?

} else {
return parser.parse(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.timeline.DataSegment;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
*/
Expand Down Expand Up @@ -91,4 +99,45 @@ public HadoopIngestionSpec withTuningConfig(HadoopTuningConfig config)
config
);
}

public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed(
HadoopIngestionSpec spec,
ObjectMapper jsonMapper,
UsedSegmentLister segmentLister
)
throws IOException
{
String dataSource = "dataSource";
String type = "type";
String multi = "multi";
String children = "children";
String segments = "segments";
String ingestionSpec = "ingestionSpec";

Map<String, Object> pathSpec = spec.getIOConfig().getPathSpec();
Map<String, Object> datasourcePathSpec = null;
if(pathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = pathSpec;
} else if(pathSpec.get(type).equals(multi)) {
List<Map<String, Object>> childPathSpecs = (List<Map<String, Object>>) pathSpec.get(children);
for(Map<String, Object> childPathSpec : childPathSpecs) {
if (childPathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = childPathSpec;
break;
}
}
}
if (datasourcePathSpec != null) {
Map<String, Object> ingestionSpecMap = (Map<String, Object>) datasourcePathSpec.get(ingestionSpec);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
);
datasourcePathSpec.put(segments, segmentsList);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the proliferation of immutable data I'm kind of surprised this works.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, I understand the concern. However, it does work and there are UTs in HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest to ensure that if it ever breaks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I trust the UTs in this matter.

}

return spec;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
Expand Down Expand Up @@ -139,8 +140,6 @@ public boolean run()

JobHelper.injectSystemProperties(job);

JobHelper.setInputFormat(job, config);

job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);

Expand Down Expand Up @@ -235,13 +234,18 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesW
private static final HashFunction hashFunction = Hashing.murmur3_128();

private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;

@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
}

@Override
Expand All @@ -268,6 +272,14 @@ protected void innerMap(
)
).asBytes();

// type SegmentInputRow serves as a marker that these InputRow instances have already been combined
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs)
:
InputRowSerde.toBytes(inputRow, aggregators);

context.write(
new SortableBytes(
bucket.get().toGroupKey(),
Expand All @@ -277,7 +289,7 @@ protected void innerMap(
.put(hashedDimensions)
.array()
).toBytesWritable(),
new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators))
new BytesWritable(serializedInputRow)
);
}
}
Expand Down
49 changes: 36 additions & 13 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.druid.indexer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
Expand All @@ -28,6 +29,7 @@
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
Expand All @@ -42,8 +44,6 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -56,8 +56,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -203,17 +205,6 @@ public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config)
return true;
}

public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig)
{
if (indexerConfig.getInputFormatClass() != null) {
job.setInputFormatClass(indexerConfig.getInputFormatClass());
} else if (indexerConfig.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
}
}

public static DataSegment serializeOutIndex(
final DataSegment segmentTemplate,
final Configuration configuration,
Expand Down Expand Up @@ -579,6 +570,38 @@ public long push() throws IOException
return zipPusher.push();
}

public static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}

public static ProgressIndicator progressIndicatorForContext(
final TaskAttemptContext context
)
Expand Down
Loading