Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Is a type of inputSpec where a static path to where the data files are located i
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|paths|Array of String|A String of input paths indicating where the raw data is located.|yes|
|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|

For example, using the static input paths:

Expand All @@ -154,6 +155,7 @@ Is a type of inputSpec that expects data to be laid out in a specific path forma
|inputPath|String|Base path to append the expected time path to.|yes|
|filePattern|String|Pattern that files should match to be included.|yes|
|pathFormat|String|Joda date-time format for each directory. Default value is `"'y'=yyyy/'m'=MM/'d'=dd/'H'=HH"`, or see [Joda documentation](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)|no|
|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|

For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths

Expand Down
106 changes: 106 additions & 0 deletions docs/content/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,112 @@ If `type` is not included, the parser defaults to `string`.
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |

### Avro Stream Parser
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaown can you include a full spec that works for avro ingestion?


This is for realtime ingestion.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_stream`. | no |
| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
| parseSpec | JSON Object | Specifies the format of the data. | yes |

For example, using Avro stream parser with schema repo Avro bytes decoder:
```json
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder" : {
"type" : "schema_repo",
"subjectAndIdConverter" : {
"type" : "avro_1124",
"topic" : "${YOUR_TOPIC}"
},
"schemaRepository" : {
"type" : "avro_1124_rest_client",
"url" : "${YOUR_SCHEMA_REPO_END_POINT}",
}
},
"parsSpec" : {
"format" : "timeAndDims",
"timestampSpec" : {},
"dimensionsSpec" : {}
}
}
```

#### Avro Bytes Decoder

If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.

##### SchemaRepo Based Avro Bytes Decoder

This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_repo`. | no |
| subjectAndIdConverter | JSON Object | Specifies the how to extract subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies the how to lookup Avro schema from subject and id. | yes |

##### Avro-1124 Subject And Id Converter

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124`. | no |
| topic | String | Specifies the topic of your kafka stream. | yes |


##### Avro-1124 Schema Repository

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |

### Avro Hadoop Parser

This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"io.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, eg: `"avro.schema.path.input.value": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution).

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_hadoop`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) |

For example, using Avro Hadoop parser with custom reader's schema file:
```json
{
"type" : "index_hadoop",
"hadoopDependencyCoordinates" : ["io.druid.extensions:druid-avro-extensions"],
"spec" : {
"dataSchema" : {
"dataSource" : "",
"parser" : {
"type" : "avro_hadoop",
"parsSpec" : {
"format" : "timeAndDims",
"timestampSpec" : {},
"dimensionsSpec" : {}
}
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"inputFormat": "io.druid.data.input.avro.AvroValueInputFormat",
"paths" : ""
}
},
"tuningConfig" : {
"jobProperties" : {
"avro.schema.path.input.value" : "/path/to/my/schema.avsc",
}
}
}
}
```

### ParseSpec

If `format` is not included, the parseSpec defaults to `tsv`.
Expand Down
107 changes: 107 additions & 0 deletions extensions/avro-extensions/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-avro-extensions</artifactId>
<name>druid-avro-extensions</name>
<description>druid-avro-extensions</description>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<avro.version>1.7.7</avro.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<classifier>hadoop2</classifier>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-api</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-client</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-avro</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.15.0</version>
<classifier>h2</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>piggybank</artifactId>
<version>0.15.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in general druid files always end with a newline, can you have same in all the files?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are multiple files which need that. I just had a comment in the main thread of the PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import org.apache.avro.generic.GenericRecord;

import java.util.List;

public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final List<String> dimensions;
private final boolean fromPigAvroStorage;

@JsonCreator
public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
}

@Override
public InputRow parse(GenericRecord record)
{
return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage);
}

@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}

@JsonProperty
public boolean isFromPigAvroStorage()
{
return fromPigAvroStorage;
}

@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
}
}
Loading