The kafka indexer with a json flatten spec specifying path does not flatten avro maps contained in GenericRecord.
Detailed Problem here in groups
Druid Version: 0.13.0-SNAPSHOT (latest master and earlier versions)
The Ingestion Spec
{
"type" : "kafka",
"dataSchema" : {
"dataSource" : "<DS>",
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder":{
"type":"schema_registry",
"url":"http://<Masked>"
},
"parseSpec" : {
"format" : "avro",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type":"path",
"name":"browsername",
"expr":"$.userBrowser..name"
},
................
"tuningConfig" : {
"type" : "kafka",
"resetOffsetAutomatically":true
}
}
Reference Schema
{ "schema": "{ \"namespace\": \"com.<masked>\", \"type\": \"record\", \"name\": \"AtomicEvent\", \"fields\": [ { ..... { \"name\": \"userBrowser\", \"type\": [ \"null\", { \"type\": \"map\", \"values\": \"string\" } ], \"default\": null }, .... } // end of schema
When events are ingested through the kafka indexer path expression succeeds however the Task log shows that there were no values indexed for the field i.e, the cardinality dimensioned as [0]
Root Cause of the Problem
In the avro-extensions module,
GenericAvroJsonProvider.#getMapValue() method seems to search the GenericRecord#map using properties from the flattenSpec that are string(unicode) while the keys in this Avro decoded GenericRecord#map are Utf8 avro type. Despite having the keys the same, the value isn't found since the encoding is different.
Solution:
GenericAvroJsonProvider.#getMapValue() - should be enhanced to provide key transformation based on the decoded types. The solution snippet below.
private Object transformAvroMapKey(String keyToTransform, Map avroMapObj) { Object key = avroMapObj.keySet().iterator().next(); if (key instanceof Utf8) { return new Utf8(keyToTransform); } return keyToTransform; }
@gianm @jihoonson - I can contribute the fix if you guys are okie with the solution
The kafka indexer with a json flatten spec specifying path does not flatten avro maps contained in GenericRecord.
Detailed Problem here in groups
Druid Version: 0.13.0-SNAPSHOT (latest master and earlier versions)
The Ingestion Spec
Reference Schema
{ "schema": "{ \"namespace\": \"com.<masked>\", \"type\": \"record\", \"name\": \"AtomicEvent\", \"fields\": [ { ..... { \"name\": \"userBrowser\", \"type\": [ \"null\", { \"type\": \"map\", \"values\": \"string\" } ], \"default\": null }, .... } // end of schemaWhen events are ingested through the kafka indexer path expression succeeds however the Task log shows that there were no values indexed for the field i.e, the cardinality dimensioned as
[0]Root Cause of the Problem
In the avro-extensions module,
GenericAvroJsonProvider.#getMapValue() method seems to search the GenericRecord#map using properties from the
flattenSpecthat are string(unicode) while the keys in this Avro decoded GenericRecord#map are Utf8 avro type. Despite having the keys the same, the value isn't found since the encoding is different.Solution:
GenericAvroJsonProvider.#getMapValue() - should be enhanced to provide key transformation based on the decoded types. The solution snippet below.
private Object transformAvroMapKey(String keyToTransform, Map avroMapObj) { Object key = avroMapObj.keySet().iterator().next(); if (key instanceof Utf8) { return new Utf8(keyToTransform); } return keyToTransform; }@gianm @jihoonson - I can contribute the fix if you guys are okie with the solution