Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ See [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Reso
|-------|------|-------------|----------|
| type | String | This should say `avro_hadoop`. | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes |
| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) |

An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path"
field types, which can be used to read nested Avro records. The "jq" field type is not currently supported for Avro.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final boolean fromPigAvroStorage;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;

@JsonCreator
public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
)
{
this.parseSpec = parseSpec;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
this.mapParser = new MapInputRowParser(parseSpec);
}

Expand All @@ -59,9 +62,15 @@ public ParseSpec getParseSpec()
return parseSpec;
}

@JsonProperty
public boolean isFromPigAvroStorage()
{
return fromPigAvroStorage;
}

@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec);
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public AvroStreamInputRowParser(
{
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
this.mapParser = new MapInputRowParser(parseSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.druid.data.input.avro;

import com.google.common.collect.Lists;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -86,14 +88,16 @@ private static boolean isFieldPrimitive(Schema.Field field)
isOptionalPrimitive(field.schema());
}


private final boolean fromPigAvroStorage;
private final boolean binaryAsString;

/**
* @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
* @param binaryAsString boolean to encode the byte[] as a string.
*/
public AvroFlattenerMaker(final boolean binaryAsString)
public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
}

Expand Down Expand Up @@ -135,6 +139,9 @@ public JsonProvider getJsonProvider()

private Object transformValue(final Object field)
{
if (fromPigAvroStorage && field instanceof GenericData.Array) {
return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
}
if (field instanceof ByteBuffer) {
if (binaryAsString) {
return StringUtils.fromUtf8(((ByteBuffer) field).array());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private AvroParsers()

public static ObjectFlattener<GenericRecord> makeFlattener(
final ParseSpec parseSpec,
final boolean fromPigAvroStorage,
final boolean binaryAsString
)
{
Expand All @@ -48,7 +49,7 @@ public static ObjectFlattener<GenericRecord> makeFlattener(
flattenSpec = JSONPathSpec.DEFAULT;
}

return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString));
return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
}

public static List<InputRow> parseGenericRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@

{"name":"someLong","type":"long"},
{"name":"someInt","type":"int"},
{"name":"someFloat","type":"float"}
{"name":"someFloat","type":"float"},
{"name":"someRecordArray","type":{
"type":"array","items":{
"type":"record","name":"MyNestedRecord","fields":[
{"name":"nestedString","type":"string"}
]
}
}}
]
}]
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,26 @@ public void setUp()
}

@Test
public void testParseNotFromSpark() throws IOException
public void testParseNotFromPigAvroStorage() throws IOException
{
testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum());
testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum(), false);
}

@Test
public void testParseFromSpark() throws IOException
public void testParseFromPigAvroStorage() throws IOException
{
testParse(buildAvroFromFile());
testParse(buildAvroFromFile(), true);
}

private void testParse(GenericRecord record) throws IOException
private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
{
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC);
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, fromPigAvroStorage);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
InputRow inputRow = parser2.parseBatch(record).get(0);
AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS);
AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage);
}

private static GenericRecord buildAvroFromFile() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ public CharSequence apply(@Nullable CharSequence input)
);
private static final String SOME_UNION_VALUE = "string as union";
private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);

private static final String SOME_RECORD_STRING_VALUE = "string in record";
private static final List<MyNestedRecord> SOME_RECORD_ARRAY_VALUE = Collections.singletonList(MyNestedRecord.newBuilder()
.setNestedString(
SOME_RECORD_STRING_VALUE)
.build());
private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");

private final ObjectMapper jsonMapper = new ObjectMapper();
Expand Down Expand Up @@ -221,7 +225,7 @@ public void testParse() throws SchemaValidationException, IOException

InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);

assertInputRowCorrect(inputRow, DIMENSIONS);
assertInputRowCorrect(inputRow, DIMENSIONS, false);
}

@Test
Expand Down Expand Up @@ -262,11 +266,11 @@ public void testParseSchemaless() throws SchemaValidationException, IOException

InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);

assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS);
assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false);
}
}

static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions)
static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro)
{
Assert.assertEquals(expectedDimensions, inputRow.getDimensions());
Assert.assertEquals(1543698L, inputRow.getTimestampFromEpoch());
Expand All @@ -279,14 +283,25 @@ static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimens
inputRow.getDimension(SOME_OTHER_ID)
);
Assert.assertEquals(Collections.singletonList(String.valueOf(true)), inputRow.getDimension(IS_VALID));
Assert.assertEquals(
Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf),
inputRow.getDimension("someIntArray")
);
Assert.assertEquals(
Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf),
inputRow.getDimension("someStringArray")
);

// someRecordArray represents a record generated from Pig using AvroStorage
// as it implicitly converts array elements to a record
if (isFromPigAvro) {
Assert.assertEquals(
Collections.singletonList(SOME_RECORD_ARRAY_VALUE.get(0).getNestedString()),
inputRow.getDimension("someRecordArray")
);
} else {
Assert.assertEquals(
Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf),
inputRow.getDimension("someIntArray")
);
Assert.assertEquals(
Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf),
inputRow.getDimension("someStringArray")
);

}
// towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality
Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size());
Assert.assertEquals(
Expand Down Expand Up @@ -358,6 +373,7 @@ public static SomeAvroDatum buildSomeAvroDatum()
.setSomeNull(null)
.setSomeEnum(MyEnum.ENUM1)
.setSomeRecord(SOME_RECORD_VALUE)
.setSomeRecordArray(SOME_RECORD_ARRAY_VALUE)
.build();
}
}
Loading