diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md
index da8140c7ed4c..a480e1f075c3 100644
--- a/docs/content/ingestion/batch-ingestion.md
+++ b/docs/content/ingestion/batch-ingestion.md
@@ -137,6 +137,7 @@ Is a type of inputSpec where a static path to where the data files are located i
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|paths|Array of String|A String of input paths indicating where the raw data is located.|yes|
+|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|
For example, using the static input paths:
@@ -154,6 +155,7 @@ Is a type of inputSpec that expects data to be laid out in a specific path forma
|inputPath|String|Base path to append the expected time path to.|yes|
|filePattern|String|Pattern that files should match to be included.|yes|
|pathFormat|String|Joda date-time format for each directory. Default value is `"'y'=yyyy/'m'=MM/'d'=dd/'H'=HH"`, or see [Joda documentation](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)|no|
+|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths
diff --git a/docs/content/ingestion/index.md b/docs/content/ingestion/index.md
index c7a4dd67f810..9252bbe8db28 100644
--- a/docs/content/ingestion/index.md
+++ b/docs/content/ingestion/index.md
@@ -91,6 +91,112 @@ If `type` is not included, the parser defaults to `string`.
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
+### Avro Stream Parser
+
+This is for realtime ingestion.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_stream`. | no |
+| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
+| parseSpec | JSON Object | Specifies the format of the data. | yes |
+
+For example, using Avro stream parser with schema repo Avro bytes decoder:
+```json
+"parser" : {
+ "type" : "avro_stream",
+ "avroBytesDecoder" : {
+ "type" : "schema_repo",
+ "subjectAndIdConverter" : {
+ "type" : "avro_1124",
+ "topic" : "${YOUR_TOPIC}"
+ },
+ "schemaRepository" : {
+ "type" : "avro_1124_rest_client",
+ "url" : "${YOUR_SCHEMA_REPO_END_POINT}",
+ }
+ },
+ "parsSpec" : {
+ "format" : "timeAndDims",
+ "timestampSpec" : {},
+ "dimensionsSpec" : {}
+ }
+}
+```
+
+#### Avro Bytes Decoder
+
+If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
+
+##### SchemaRepo Based Avro Bytes Decoder
+
+This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `schema_repo`. | no |
+| subjectAndIdConverter | JSON Object | Specifies the how to extract subject and id from message bytes. | yes |
+| schemaRepository | JSON Object | Specifies the how to lookup Avro schema from subject and id. | yes |
+
+##### Avro-1124 Subject And Id Converter
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_1124`. | no |
+| topic | String | Specifies the topic of your kafka stream. | yes |
+
+
+##### Avro-1124 Schema Repository
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_1124_rest_client`. | no |
+| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |
+
+### Avro Hadoop Parser
+
+This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"io.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, eg: `"avro.schema.path.input.value": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution).
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_hadoop`. | no |
+| parseSpec | JSON Object | Specifies the format of the data. | yes |
+| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) |
+
+For example, using Avro Hadoop parser with custom reader's schema file:
+```json
+{
+ "type" : "index_hadoop",
+ "hadoopDependencyCoordinates" : ["io.druid.extensions:druid-avro-extensions"],
+ "spec" : {
+ "dataSchema" : {
+ "dataSource" : "",
+ "parser" : {
+ "type" : "avro_hadoop",
+ "parsSpec" : {
+ "format" : "timeAndDims",
+ "timestampSpec" : {},
+ "dimensionsSpec" : {}
+ }
+ }
+ },
+ "ioConfig" : {
+ "type" : "hadoop",
+ "inputSpec" : {
+ "type" : "static",
+ "inputFormat": "io.druid.data.input.avro.AvroValueInputFormat",
+ "paths" : ""
+ }
+ },
+ "tuningConfig" : {
+ "jobProperties" : {
+ "avro.schema.path.input.value" : "/path/to/my/schema.avsc",
+ }
+ }
+ }
+}
+```
+
### ParseSpec
If `format` is not included, the parseSpec defaults to `tsv`.
diff --git a/extensions/avro-extensions/pom.xml b/extensions/avro-extensions/pom.xml
new file mode 100644
index 000000000000..029d3005934e
--- /dev/null
+++ b/extensions/avro-extensions/pom.xml
@@ -0,0 +1,107 @@
+
+
+
+
+ 4.0.0
+ io.druid.extensions
+ druid-avro-extensions
+ druid-avro-extensions
+ druid-avro-extensions
+
+
+ io.druid
+ druid
+ 0.9.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+ 0.1.3
+ 1.7.7
+
+
+
+
+ org.apache.avro
+ avro-mapred
+ hadoop2
+ ${avro.version}
+
+
+ io.druid
+ druid-api
+
+
+ org.schemarepo
+ schema-repo-api
+ ${schemarepo.version}
+
+
+ org.schemarepo
+ schema-repo-client
+ ${schemarepo.version}
+
+
+ org.schemarepo
+ schema-repo-avro
+ ${schemarepo.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.pig
+ pig
+ 0.15.0
+ h2
+ test
+
+
+ org.apache.pig
+ piggybank
+ 0.15.0
+ test
+
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ schema
+
+
+
+
+
+
+
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java
new file mode 100644
index 000000000000..58aaf11adf0e
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.ParseSpec;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+public class AvroHadoopInputRowParser implements InputRowParser
+{
+ private final ParseSpec parseSpec;
+ private final List dimensions;
+ private final boolean fromPigAvroStorage;
+
+ @JsonCreator
+ public AvroHadoopInputRowParser(
+ @JsonProperty("parseSpec") ParseSpec parseSpec,
+ @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
+ )
+ {
+ this.parseSpec = parseSpec;
+ this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
+ this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
+ }
+
+ @Override
+ public InputRow parse(GenericRecord record)
+ {
+ return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage);
+ }
+
+ @JsonProperty
+ @Override
+ public ParseSpec getParseSpec()
+ {
+ return parseSpec;
+ }
+
+ @JsonProperty
+ public boolean isFromPigAvroStorage()
+ {
+ return fromPigAvroStorage;
+ }
+
+ @Override
+ public InputRowParser withParseSpec(ParseSpec parseSpec)
+ {
+ return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
+ }
+}
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java
new file mode 100644
index 000000000000..2e38024f2a19
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.data.input.avro.AvroBytesDecoder;
+import io.druid.data.input.avro.GenericRecordAsMap;
+import io.druid.data.input.impl.ParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.DateTime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AvroStreamInputRowParser implements ByteBufferInputRowParser
+{
+ private final ParseSpec parseSpec;
+ private final List dimensions;
+ private final AvroBytesDecoder avroBytesDecoder;
+
+ @JsonCreator
+ public AvroStreamInputRowParser(
+ @JsonProperty("parseSpec") ParseSpec parseSpec,
+ @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder
+ )
+ {
+ this.parseSpec = parseSpec;
+ this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
+ this.avroBytesDecoder = avroBytesDecoder;
+ }
+
+ @Override
+ public InputRow parse(ByteBuffer input)
+ {
+ return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false);
+ }
+
+ protected static InputRow parseGenericRecord(
+ GenericRecord record, ParseSpec parseSpec, List dimensions, boolean fromPigAvroStorage
+ )
+ {
+ GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage);
+ TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
+ DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
+ return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
+ }
+
+ @JsonProperty
+ @Override
+ public ParseSpec getParseSpec()
+ {
+ return parseSpec;
+ }
+
+ @JsonProperty
+ public AvroBytesDecoder getAvroBytesDecoder()
+ {
+ return avroBytesDecoder;
+ }
+
+ @Override
+ public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
+ {
+ return new AvroStreamInputRowParser(
+ parseSpec,
+ avroBytesDecoder
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
+
+ if (!parseSpec.equals(that.parseSpec)) {
+ return false;
+ }
+ if (!dimensions.equals(that.dimensions)) {
+ return false;
+ }
+ return avroBytesDecoder.equals(that.avroBytesDecoder);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = parseSpec.hashCode();
+ result = 31 * result + dimensions.hashCode();
+ result = 31 * result + avroBytesDecoder.hashCode();
+ return result;
+ }
+}
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java
new file mode 100644
index 000000000000..149b34aa806d
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
+})
+public interface AvroBytesDecoder
+{
+ GenericRecord parse(ByteBuffer bytes);
+}
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java
new file mode 100644
index 000000000000..12b8a61dae18
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import io.druid.data.input.AvroHadoopInputRowParser;
+import io.druid.data.input.AvroStreamInputRowParser;
+import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
+import io.druid.initialization.DruidModule;
+import org.schemarepo.InMemoryRepository;
+import org.schemarepo.Repository;
+import org.schemarepo.ValidatorFactory;
+import org.schemarepo.json.GsonJsonUtil;
+import org.schemarepo.json.JsonUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AvroExtensionsModule implements DruidModule
+{
+ public AvroExtensionsModule() {}
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Arrays.asList(
+ new SimpleModule("AvroInputRowParserModule")
+ .registerSubtypes(
+ new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
+ new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop")
+ )
+ .setMixInAnnotation(Repository.class, RepositoryMixIn.class)
+ .setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class)
+ .setMixInAnnotation(InMemoryRepository.class, InMemoryRepositoryMixIn.class)
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ { }
+}
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GsonJsonUtil.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "gson", value = GsonJsonUtil.class)
+})
+abstract class JsonUtilMixIn
+{
+}
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Avro1124RESTRepositoryClientWrapper.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "avro_1124_rest_client", value = Avro1124RESTRepositoryClientWrapper.class),
+ @JsonSubTypes.Type(name = "in_memory_for_unit_test", value = InMemoryRepository.class)
+})
+abstract class RepositoryMixIn
+{
+}
+
+abstract class InMemoryRepositoryMixIn
+{
+ @JsonCreator
+ public InMemoryRepositoryMixIn(@JsonProperty("validators") ValidatorFactory validators)
+ {
+ }
+}
+
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java
new file mode 100644
index 000000000000..db9f0090fde8
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.avro;
+
+import com.metamx.common.logger.Logger;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+
+public class AvroValueInputFormat extends FileInputFormat
+{
+ private static final Logger log = new Logger(AvroValueInputFormat.class);
+
+ private static final String CONF_INPUT_VALUE_SCHEMA_PATH = "avro.schema.input.value.path";
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context
+ ) throws IOException, InterruptedException
+ {
+ Schema readerSchema = AvroJob.getInputValueSchema(context.getConfiguration());
+
+ if (readerSchema == null) {
+ String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
+ if (StringUtils.isNotBlank(schemaFilePath)) {
+ log.info("Using file: %s as reader schema.", schemaFilePath);
+ FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath));
+ try {
+ readerSchema = new Schema.Parser().parse(inputStream);
+ }
+ finally {
+ inputStream.close();
+ }
+ }
+ }
+
+ if (null == readerSchema) {
+ log.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
+ log.info("Using a reader schema equal to the writer schema.");
+ }
+ return new AvroValueRecordReader(readerSchema);
+ }
+}
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java
new file mode 100644
index 000000000000..d0861eb471d8
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapreduce.AvroRecordReaderBase;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+public class AvroValueRecordReader extends AvroRecordReaderBase
+{
+ public AvroValueRecordReader(Schema readerSchema)
+ {
+ super(readerSchema);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException
+ {
+ return NullWritable.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public GenericRecord getCurrentValue() throws IOException, InterruptedException
+ {
+ return getCurrentRecord();
+ }
+}
diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java
new file mode 100644
index 000000000000..e129a13acb93
--- /dev/null
+++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.avro;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class GenericRecordAsMap implements Map
+{
+ private final GenericRecord record;
+ private final boolean fromPigAvroStorage;
+
+ private static final Function