diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md index da11af606acc..9774b148f174 100644 --- a/docs/development/extensions-core/avro.md +++ b/docs/development/extensions-core/avro.md @@ -169,9 +169,16 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `schema_registry`. | no | -| url | String | Specifies the url endpoint of the Schema Registry. | yes | -| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: ```json ... "avroBytesDecoder" : { @@ -181,6 +188,27 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io ... ``` +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "schema.registry.basic.auth.credentials.source" : "USER_INFO", + "schema.registry.basic.auth.user.info" : "fred:letmein", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + + } +} +... +``` + ### Avro Hadoop Parser This is for batch ingestion using the `HadoopDruidIndexer`. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, e.g.: `"avro.schema.input.value.path": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). Make sure to include "org.apache.druid.extensions:druid-avro-extensions" as an extension. diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index f84c0dae010c..5b2ca2b78d61 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -36,7 +36,7 @@ 0.1.3 - 3.0.1 + 5.2.0 diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 6ff97c41857a..b2a4bb96e156 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -31,7 +31,10 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Objects; public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @@ -41,11 +44,18 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("url") String url, - @JsonProperty("capacity") Integer capacity + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") @Nullable List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity); + if (!url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + } } //For UT only @@ -63,7 +73,7 @@ public GenericRecord parse(ByteBuffer bytes) int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getByID(id); + Schema schema = registry.getById(id); DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index f5f1776a36f8..a26592052458 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -52,7 +52,7 @@ public void setUp() public void testParse() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -68,7 +68,7 @@ public void testParse() throws Exception public void testParseCorrupted() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -81,7 +81,7 @@ public void testParseCorrupted() throws Exception public void testParseWrongId() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + Mockito.when(registry.getById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum);