From 3c050c109f561e010adda6b52228a9fa6e82502c Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Sun, 26 Jan 2020 11:46:58 -0600 Subject: [PATCH 1/2] Add processing for data files from AvroStorage --- docs/ingestion/data-formats.md | 1 + .../data/input/AvroHadoopInputRowParser.java | 15 +++- .../data/input/AvroStreamInputRowParser.java | 2 +- .../data/input/avro/AvroFlattenerMaker.java | 11 ++- .../druid/data/input/avro/AvroParsers.java | 3 +- .../src/test/avro/some-datum.avsc | 9 +- .../input/AvroHadoopInputRowParserTest.java | 14 +-- .../input/AvroStreamInputRowParserTest.java | 40 ++++++--- .../input/avro/AvroFlattenerMakerTest.java | 88 ++++++++++--------- .../avro/ParquetAvroHadoopInputRowParser.java | 2 +- 10 files changed, 117 insertions(+), 68 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 0d58e95c1a2d..04a402cf6174 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -308,6 +308,7 @@ See [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Reso |-------|------|-------------|----------| | type | String | This should say `avro_hadoop`. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes | +| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) | An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path" field types, which can be used to read nested Avro records. The "jq" field type is not currently supported for Avro. diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java index 7d7c8eb8bc3c..39ce48ec2b5e 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java @@ -33,16 +33,19 @@ public class AvroHadoopInputRowParser implements InputRowParser { private final ParseSpec parseSpec; + private final boolean fromPigAvroStorage; private final ObjectFlattener avroFlattener; private final MapInputRowParser mapParser; @JsonCreator public AvroHadoopInputRowParser( - @JsonProperty("parseSpec") ParseSpec parseSpec + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage ) { this.parseSpec = parseSpec; - this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false); + this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false); this.mapParser = new MapInputRowParser(parseSpec); } @@ -59,9 +62,15 @@ public ParseSpec getParseSpec() return parseSpec; } + @JsonProperty + public boolean isFromPigAvroStorage() + { + return fromPigAvroStorage; + } + @Override public InputRowParser withParseSpec(ParseSpec parseSpec) { - return new AvroHadoopInputRowParser(parseSpec); + return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage); } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java index 7dddf2387489..f375c63b8b23 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java @@ -48,7 +48,7 @@ public AvroStreamInputRowParser( { this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); - this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false); + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); this.mapParser = new MapInputRowParser(parseSpec); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 1957bf9ae5ad..e5c0a04e55b1 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -19,11 +19,13 @@ package org.apache.druid.data.input.avro; +import com.google.common.collect.Lists; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.apache.druid.java.util.common.StringUtils; @@ -86,14 +88,16 @@ private static boolean isFieldPrimitive(Schema.Field field) isOptionalPrimitive(field.schema()); } - + private final boolean fromPigAvroStorage; private final boolean binaryAsString; /** + * @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage * @param binaryAsString boolean to encode the byte[] as a string. */ - public AvroFlattenerMaker(final boolean binaryAsString) + public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString) { + this.fromPigAvroStorage = fromPigAvroStorage; this.binaryAsString = binaryAsString; } @@ -135,6 +139,9 @@ public JsonProvider getJsonProvider() private Object transformValue(final Object field) { + if (fromPigAvroStorage && field instanceof GenericData.Array) { + return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0))); + } if (field instanceof ByteBuffer) { if (binaryAsString) { return StringUtils.fromUtf8(((ByteBuffer) field).array()); diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index a8baa428d753..9d5665e800b8 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -38,6 +38,7 @@ private AvroParsers() public static ObjectFlattener makeFlattener( final ParseSpec parseSpec, + final boolean fromPigAvroStorage, final boolean binaryAsString ) { @@ -48,7 +49,7 @@ public static ObjectFlattener makeFlattener( flattenSpec = JSONPathSpec.DEFAULT; } - return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString)); + return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString)); } public static List parseGenericRecord( diff --git a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc index dca6ca45ff0c..bf8ea6883ab1 100644 --- a/extensions-core/avro-extensions/src/test/avro/some-datum.avsc +++ b/extensions-core/avro-extensions/src/test/avro/some-datum.avsc @@ -26,6 +26,13 @@ {"name":"someLong","type":"long"}, {"name":"someInt","type":"int"}, - {"name":"someFloat","type":"float"} + {"name":"someFloat","type":"float"}, + {"name":"someRecordArray","type":{ + "type":"array","items":{ + "type":"record","name":"MyNestedRecord","fields":[ + {"name":"nestedString","type":"string"} + ] + } + }} ] }] diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java index e67d5a14650b..656707e7d8cd 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java @@ -49,26 +49,26 @@ public void setUp() } @Test - public void testParseNotFromSpark() throws IOException + public void testParseNotFromPigAvroStorage() throws IOException { - testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum()); + testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum(), false); } @Test - public void testParseFromSpark() throws IOException + public void testParseFromPigAvroStorage() throws IOException { - testParse(buildAvroFromFile()); + testParse(buildAvroFromFile(), true); } - private void testParse(GenericRecord record) throws IOException + private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException { - AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC); + AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage); AvroHadoopInputRowParser parser2 = jsonMapper.readValue( jsonMapper.writeValueAsBytes(parser), AvroHadoopInputRowParser.class ); InputRow inputRow = parser2.parseBatch(record).get(0); - AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS); + AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage); } private static GenericRecord buildAvroFromFile() throws IOException diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java index 291210eb45c0..708e89ecab93 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java @@ -151,7 +151,11 @@ public CharSequence apply(@Nullable CharSequence input) ); private static final String SOME_UNION_VALUE = "string as union"; private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); - + private static final String SOME_RECORD_STRING_VALUE = "string in record"; + private static final List SOME_RECORD_ARRAY_VALUE = Collections.singletonList(MyNestedRecord.newBuilder() + .setNestedString( + SOME_RECORD_STRING_VALUE) + .build()); private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]"); private final ObjectMapper jsonMapper = new ObjectMapper(); @@ -221,7 +225,7 @@ public void testParse() throws SchemaValidationException, IOException InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - assertInputRowCorrect(inputRow, DIMENSIONS); + assertInputRowCorrect(inputRow, DIMENSIONS, false); } @Test @@ -262,11 +266,11 @@ public void testParseSchemaless() throws SchemaValidationException, IOException InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS); + assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); } } - static void assertInputRowCorrect(InputRow inputRow, List expectedDimensions) + static void assertInputRowCorrect(InputRow inputRow, List expectedDimensions, boolean isFromPigAvro) { Assert.assertEquals(expectedDimensions, inputRow.getDimensions()); Assert.assertEquals(1543698L, inputRow.getTimestampFromEpoch()); @@ -279,14 +283,25 @@ static void assertInputRowCorrect(InputRow inputRow, List expectedDimens inputRow.getDimension(SOME_OTHER_ID) ); Assert.assertEquals(Collections.singletonList(String.valueOf(true)), inputRow.getDimension(IS_VALID)); - Assert.assertEquals( - Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf), - inputRow.getDimension("someIntArray") - ); - Assert.assertEquals( - Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf), - inputRow.getDimension("someStringArray") - ); + + // someRecordArray represents a record generated from Pig using AvroStorage + // as it implicitly converts array elements to a record + if (isFromPigAvro) { + Assert.assertEquals( + Collections.singletonList(SOME_RECORD_ARRAY_VALUE.get(0).getNestedString()), + inputRow.getDimension("someRecordArray") + ); + } else { + Assert.assertEquals( + Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf), + inputRow.getDimension("someIntArray") + ); + Assert.assertEquals( + Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf), + inputRow.getDimension("someStringArray") + ); + + } // towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size()); Assert.assertEquals( @@ -358,6 +373,7 @@ public static SomeAvroDatum buildSomeAvroDatum() .setSomeNull(null) .setSomeEnum(MyEnum.ENUM1) .setSomeRecord(SOME_RECORD_VALUE) + .setSomeRecordArray(SOME_RECORD_ARRAY_VALUE) .build(); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java index 675a9ceb02da..d3faaf412bf1 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java @@ -31,172 +31,180 @@ public class AvroFlattenerMakerTest public void getRootField() { final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); - final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false); Assert.assertEquals( - record.timestamp, + record.getTimestamp(), flattener.getRootField(record, "timestamp") ); Assert.assertEquals( - record.eventType, + record.getEventType(), flattener.getRootField(record, "eventType") ); Assert.assertEquals( - record.id, + record.getId(), flattener.getRootField(record, "id") ); Assert.assertEquals( - record.someOtherId, + record.getSomeOtherId(), flattener.getRootField(record, "someOtherId") ); Assert.assertEquals( - record.isValid, + record.getIsValid(), flattener.getRootField(record, "isValid") ); Assert.assertEquals( - record.someIntArray, + record.getSomeIntArray(), flattener.getRootField(record, "someIntArray") ); Assert.assertEquals( - record.someStringArray, + record.getSomeStringArray(), flattener.getRootField(record, "someStringArray") ); Assert.assertEquals( - record.someIntValueMap, + record.getSomeIntValueMap(), flattener.getRootField(record, "someIntValueMap") ); Assert.assertEquals( - record.someStringValueMap, + record.getSomeStringValueMap(), flattener.getRootField(record, "someStringValueMap") ); Assert.assertEquals( - record.someUnion, + record.getSomeUnion(), flattener.getRootField(record, "someUnion") ); Assert.assertEquals( - record.someNull, + record.getSomeNull(), flattener.getRootField(record, "someNull") ); Assert.assertEquals( - record.someFixed, + record.getSomeFixed(), flattener.getRootField(record, "someFixed") ); Assert.assertEquals( // Casted to an array by transformValue - record.someBytes.array(), + record.getSomeBytes().array(), flattener.getRootField(record, "someBytes") ); Assert.assertEquals( - record.someEnum, + record.getSomeEnum(), flattener.getRootField(record, "someEnum") ); Assert.assertEquals( - record.someRecord, + record.getSomeRecord(), flattener.getRootField(record, "someRecord") ); Assert.assertEquals( - record.someLong, + record.getSomeLong(), flattener.getRootField(record, "someLong") ); Assert.assertEquals( - record.someInt, + record.getSomeInt(), flattener.getRootField(record, "someInt") ); Assert.assertEquals( - record.someFloat, + record.getSomeFloat(), flattener.getRootField(record, "someFloat") ); + Assert.assertEquals( + record.getSomeRecordArray(), + flattener.getRootField(record, "someRecordArray") + ); } @Test public void makeJsonPathExtractor() { final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); - final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false); Assert.assertEquals( - record.timestamp, + record.getTimestamp(), flattener.makeJsonPathExtractor("$.timestamp").apply(record) ); Assert.assertEquals( - record.eventType, + record.getEventType(), flattener.makeJsonPathExtractor("$.eventType").apply(record) ); Assert.assertEquals( - record.id, + record.getId(), flattener.makeJsonPathExtractor("$.id").apply(record) ); Assert.assertEquals( - record.someOtherId, + record.getSomeOtherId(), flattener.makeJsonPathExtractor("$.someOtherId").apply(record) ); Assert.assertEquals( - record.isValid, + record.getIsValid(), flattener.makeJsonPathExtractor("$.isValid").apply(record) ); Assert.assertEquals( - record.someIntArray, + record.getSomeIntArray(), flattener.makeJsonPathExtractor("$.someIntArray").apply(record) ); Assert.assertEquals( - record.someStringArray, + record.getSomeStringArray(), flattener.makeJsonPathExtractor("$.someStringArray").apply(record) ); Assert.assertEquals( - record.someIntValueMap, + record.getSomeIntValueMap(), flattener.makeJsonPathExtractor("$.someIntValueMap").apply(record) ); Assert.assertEquals( - record.someStringValueMap, + record.getSomeStringValueMap(), flattener.makeJsonPathExtractor("$.someStringValueMap").apply(record) ); Assert.assertEquals( - record.someUnion, + record.getSomeUnion(), flattener.makeJsonPathExtractor("$.someUnion").apply(record) ); Assert.assertEquals( - record.someNull, + record.getSomeNull(), flattener.makeJsonPathExtractor("$.someNull").apply(record) ); Assert.assertEquals( - record.someFixed, + record.getSomeFixed(), flattener.makeJsonPathExtractor("$.someFixed").apply(record) ); Assert.assertEquals( // Casted to an array by transformValue - record.someBytes.array(), + record.getSomeBytes().array(), flattener.makeJsonPathExtractor("$.someBytes").apply(record) ); Assert.assertEquals( - record.someEnum, + record.getSomeEnum(), flattener.makeJsonPathExtractor("$.someEnum").apply(record) ); Assert.assertEquals( - record.someRecord, + record.getSomeRecord(), flattener.makeJsonPathExtractor("$.someRecord").apply(record) ); Assert.assertEquals( - record.someLong, + record.getSomeLong(), flattener.makeJsonPathExtractor("$.someLong").apply(record) ); Assert.assertEquals( - record.someInt, + record.getSomeInt(), flattener.makeJsonPathExtractor("$.someInt").apply(record) ); Assert.assertEquals( - record.someFloat, + record.getSomeFloat(), flattener.makeJsonPathExtractor("$.someFloat").apply(record) ); + Assert.assertEquals( + record.getSomeRecordArray(), + flattener.makeJsonPathExtractor("$.someRecordArray").apply(record) + ); } @Test(expected = UnsupportedOperationException.class) public void makeJsonQueryExtractor() { final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); - final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false); Assert.assertEquals( - record.timestamp, + record.getTimestamp(), flattener.makeJsonQueryExtractor("$.timestamp").apply(record) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index 3c63eb26ae54..c8e9a4510e11 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -75,7 +75,7 @@ public ParquetAvroHadoopInputRowParser( this.recordFlattener = ObjectFlatteners.create( flattenSpec, - new AvroFlattenerMaker(this.binaryAsString) + new AvroFlattenerMaker(false, this.binaryAsString) ); } From 4cb6ddaa4f52d832b75578e9940b8372a9a5943d Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 7 Feb 2020 09:25:08 -0600 Subject: [PATCH 2/2] Add words to spellings file --- website/.spelling | 2 ++ 1 file changed, 2 insertions(+) diff --git a/website/.spelling b/website/.spelling index a721bd63dd62..cb09e12d5460 100644 --- a/website/.spelling +++ b/website/.spelling @@ -22,6 +22,7 @@ 64-bit ACL APIs +AvroStorage AWS AWS_CONTAINER_CREDENTIALS_RELATIVE_URI AWS_CONTAINER_CREDENTIALS_FULL_URI @@ -234,6 +235,7 @@ filesystem firefox firehose firehoses +fromPigAvroStorage frontends granularities gzip