From 9c5900ca87f168a1aa2faeeaa0e9b2684b7d848b Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 24 Dec 2019 05:05:04 +0530 Subject: [PATCH 1/5] Add config and header support for confluent schema registry. --- extensions-core/avro-extensions/pom.xml | 2 +- .../avro/SchemaRegistryBasedAvroBytesDecoder.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index f84c0dae010c..5b2ca2b78d61 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -36,7 +36,7 @@ 0.1.3 - 3.0.1 + 5.2.0 diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 6ff97c41857a..6613afc51074 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -31,7 +31,10 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Objects; public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @@ -41,11 +44,18 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("url") String url, - @JsonProperty("capacity") Integer capacity + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity); + if (!url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + } } //For UT only From 5e88ab9e8dc32d62284e6538db18f10f0fbdd695 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 24 Dec 2019 12:06:01 +0530 Subject: [PATCH 2/5] add json nullable property to urls. --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 6613afc51074..eedafeada024 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -45,7 +45,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("url") String url, @JsonProperty("capacity") Integer capacity, - @JsonProperty("urls") List urls, + @JsonProperty("urls") @Nullable List urls, @JsonProperty("config") @Nullable Map config, @JsonProperty("headers") @Nullable Map headers ) From bc864385d24b62bf39448e9f8d4d59f84ee9c1ec Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 24 Dec 2019 12:43:03 +0530 Subject: [PATCH 3/5] add alternative to the deprecative function. --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index eedafeada024..b2a4bb96e156 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -73,7 +73,7 @@ public GenericRecord parse(ByteBuffer bytes) int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getByID(id); + Schema schema = registry.getById(id); DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } From badb9139c967a438ab50f7d88100b25a1f69fee5 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 24 Dec 2019 12:47:36 +0530 Subject: [PATCH 4/5] add support to deprecated function. --- .../input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index f5f1776a36f8..a26592052458 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -52,7 +52,7 @@ public void setUp() public void testParse() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -68,7 +68,7 @@ public void testParse() throws Exception public void testParseCorrupted() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -81,7 +81,7 @@ public void testParseCorrupted() throws Exception public void testParseWrongId() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + Mockito.when(registry.getById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); From 354cb4f97de871c6985668e9f206112cb64f3950 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 24 Dec 2019 18:35:55 +0530 Subject: [PATCH 5/5] updated read doc. --- docs/development/extensions-core/avro.md | 32 ++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md index da11af606acc..9774b148f174 100644 --- a/docs/development/extensions-core/avro.md +++ b/docs/development/extensions-core/avro.md @@ -169,9 +169,16 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `schema_registry`. | no | -| url | String | Specifies the url endpoint of the Schema Registry. | yes | -| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: ```json ... "avroBytesDecoder" : { @@ -181,6 +188,27 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io ... ``` +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "schema.registry.basic.auth.credentials.source" : "USER_INFO", + "schema.registry.basic.auth.user.info" : "fred:letmein", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + + } +} +... +``` + ### Avro Hadoop Parser This is for batch ingestion using the `HadoopDruidIndexer`. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, e.g.: `"avro.schema.input.value.path": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). Make sure to include "org.apache.druid.extensions:druid-avro-extensions" as an extension.