From 56472ed2ca070ada3400e473ddaaf0fe96799446 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Thu, 20 Aug 2020 18:17:02 +0200 Subject: [PATCH 1/9] Add config and header support for confluent schema registry. (porting code from https://github.com/apache/druid/pull/9096) --- docs/ingestion/data-formats.md | 27 ++++++++++ extensions-core/avro-extensions/pom.xml | 2 +- .../SchemaRegistryBasedAvroBytesDecoder.java | 18 +++++-- ...hemaRegistryBasedAvroBytesDecoderTest.java | 49 +++++++++++++++++-- integration-tests/pom.xml | 6 +++ licenses.yaml | 6 ++- 6 files changed, 99 insertions(+), 9 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 4ee31812f98c..ad808cc31ac9 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -1010,7 +1010,13 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | type | String | This should say `schema_registry`. | no | | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: ```json ... "avroBytesDecoder" : { @@ -1020,6 +1026,27 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu ... ``` +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "schema.registry.basic.auth.credentials.source" : "USER_INFO", + "schema.registry.basic.auth.user.info" : "fred:letmein", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + + } +} +... +``` + ### Protobuf Parser > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser. diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index de83715501f0..a15f17976393 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -36,7 +36,7 @@ 0.1.3 - 3.0.1 + 5.5.1 diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 6ff97c41857a..3bdbc3f6d8b0 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -31,7 +31,10 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Objects; public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @@ -40,12 +43,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( - @JsonProperty("url") String url, - @JsonProperty("capacity") Integer capacity + @JsonProperty("url") @Deprecated String url, + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") @Nullable List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity); + if (url != null && !url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + } } //For UT only @@ -63,7 +73,7 @@ public GenericRecord parse(ByteBuffer bytes) int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getByID(id); + Schema schema = registry.getById(id); DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index f5f1776a36f8..44d87be6a1d7 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -48,11 +49,53 @@ public void setUp() registry = Mockito.mock(SchemaRegistryClient.class); } + @Test + public void testMultipleUrls() throws Exception + { + String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testUrl() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testConfig() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + @Test public void testParse() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -68,7 +111,7 @@ public void testParse() throws Exception public void testParseCorrupted() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -81,7 +124,7 @@ public void testParseCorrupted() throws Exception public void testParseWrongId() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + Mockito.when(registry.getById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 4fee910ac3c6..7df4869d3117 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -74,6 +74,12 @@ commons-codec commons-codec + + javax.ws.rs + javax.ws.rs-api + 2.1.1 + runtime + org.apache.druid druid-core diff --git a/licenses.yaml b/licenses.yaml index b31c5a5aa365..02b9fc4a00bb 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3081,12 +3081,16 @@ notices: --- name: Kafka Schema Registry Client -version: 3.0.1 +version: 5.5.1 license_category: binary module: extensions/druid-avro-extensions license_name: Apache License version 2.0 libraries: - io.confluent: kafka-schema-registry-client + - io.confluent: common-config + - io.confluent: common-utils + - com.101tec: zkclient + - org.apache.kafka: kafka-clients --- From bdf97f6928ae48accc520448cdb72f7d778897d5 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Mon, 24 Aug 2020 18:23:34 +0200 Subject: [PATCH 2/9] Add Eclipse Public License 2.0 to license check --- distribution/bin/check-licenses.py | 19 ++++++++++++++++--- website/.spelling | 1 + 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 5704adc0ba21..020282b970db 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -201,9 +201,12 @@ def set_attr(self, data): raise Exception("Unknown attr_index [{}]".format(self.attr_index)) def set_license(self, data): - if data.upper().find("GPL") < 0: - if self.license != 'Apache License version 2.0': - self.license = self.compatible_license_names[data] + try: + if data.upper().find("GPL") < 0: + if self.license != 'Apache License version 2.0': + self.license = self.compatible_license_names[data] + except KeyError as e: + print(e) def print_log_to_stderr(string): @@ -216,6 +219,7 @@ def build_compatible_license_names(): compatible_licenses['Apache 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache-2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2'] = 'Apache License version 2.0' + compatible_licenses['Apache License 2'] = 'Apache License version 2.0' compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0' compatible_licenses['The Apache License, Version 2.0'] = 'Apache License version 2.0' @@ -223,6 +227,7 @@ def build_compatible_license_names(): compatible_licenses['Apache License Version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License Version 2'] = 'Apache License version 2.0' compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' + compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' @@ -259,6 +264,14 @@ def build_compatible_license_names(): compatible_licenses['Eclipse Public License - Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License, Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License v1.0'] = 'Eclipse Public License 1.0' + compatible_licenses['EPL 1.0'] = 'Eclipse Public License 1.0' + + compatible_licenses['Eclipse Public License 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['The Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License - Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License v2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['EPL 2.0'] = 'Eclipse Public License 2.0' compatible_licenses['Eclipse Distribution License 1.0'] = 'Eclipse Distribution License 1.0' compatible_licenses['Eclipse Distribution License - v 1.0'] = 'Eclipse Distribution License 1.0' diff --git a/website/.spelling b/website/.spelling index a14788519ba2..54d274d385c2 100644 --- a/website/.spelling +++ b/website/.spelling @@ -911,6 +911,7 @@ ctrl jsonLowercase listDelimiter timestampSpec +urls - ../docs/ingestion/data-management.md 1GB IOConfig From 14887008fd107f1e72b28b2592bb87641cf74749 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Tue, 25 Aug 2020 09:07:10 +0200 Subject: [PATCH 3/9] Update licenses.yaml, revert changes to check-licenses.py and dependencies for integration-tests --- distribution/bin/check-licenses.py | 9 +-- integration-tests/pom.xml | 6 -- licenses.yaml | 91 +++++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 13 deletions(-) diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 020282b970db..871ae868b4fd 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -201,12 +201,9 @@ def set_attr(self, data): raise Exception("Unknown attr_index [{}]".format(self.attr_index)) def set_license(self, data): - try: - if data.upper().find("GPL") < 0: - if self.license != 'Apache License version 2.0': - self.license = self.compatible_license_names[data] - except KeyError as e: - print(e) + if data.upper().find("GPL") < 0: + if self.license != 'Apache License version 2.0': + self.license = self.compatible_license_names[data] def print_log_to_stderr(string): diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 7df4869d3117..4fee910ac3c6 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -74,12 +74,6 @@ commons-codec commons-codec - - javax.ws.rs - javax.ws.rs-api - 2.1.1 - runtime - org.apache.druid druid-core diff --git a/licenses.yaml b/licenses.yaml index 02b9fc4a00bb..383196ec0265 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3089,9 +3089,98 @@ libraries: - io.confluent: kafka-schema-registry-client - io.confluent: common-config - io.confluent: common-utils - - com.101tec: zkclient + +--- + + name: com.101tec zkclient + license_category: binary + version: '0.10' + module: druid-ranger-security + license_name: Apache License version 2.0 + libraries: + - com.101tec: zkclient + +--- + +name: Kafka Client +version: 5.5.1-ccs +license_category: binary +module: extensions/druid-avro-extensions +license_name: Apache License version 2.0 +libraries: - org.apache.kafka: kafka-clients +--- + + name: swagger-annotations + version: 1.6.0 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Apache License version 2.0 + libraries: + - io.swagger: swagger-annotations + +--- + + name: jersey-common + version: '2.30' + license_category: binary + module: extensions/druid-avro-extensions + license_name: Apache License version 2.0 + libraries: + - org.glassfish.jersey.core: jersey-common + skip_dependency_report_check: true + +--- + + name: osgi-resource-locator + version: 1.0.3 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Eclipse Public License 2.0 + libraries: + - org.glassfish.hk2: osgi-resource-locator + +--- + + name: jakarta.inject + version: 2.6.1 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Eclipse Public License 2.0 + libraries: + - org.glassfish.hk2.external: jakarta.inject + +--- + + name: jakarta.annotation + version: 1.3.5 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Eclipse Public License 2.0 + libraries: + - jakarta.annotation: jakarta.annotation-api + +--- + + name: javax.ws.rs-api + version: 2.1.1 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Eclipse Public License 2.0 + libraries: + - javax.ws.rs: javax.ws.rs-api + +--- + + name: jakarta.ws.rs-api + version: 2.1.6 + license_category: binary + module: extensions/druid-avro-extensions + license_name: Eclipse Public License 2.0 + libraries: + - jakarta.ws.rs: jakarta.ws.rs-api + --- name: Apache Velocity Engine From 68cbd0a0f6da6e1b760e9f7263d88d58724a749b Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Fri, 26 Feb 2021 13:11:11 +0000 Subject: [PATCH 4/9] Add spelling exception and remove unused dependency --- integration-tests/pom.xml | 4 ---- website/.spelling | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index fc3b7b50eecb..3664f3473d98 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -240,10 +240,6 @@ kafka-clients ${apache.kafka.version} - - javax.ws.rs - jsr311-api - io.netty netty diff --git a/website/.spelling b/website/.spelling index 004ef65f7744..7b6e38aaa5cc 100644 --- a/website/.spelling +++ b/website/.spelling @@ -417,6 +417,7 @@ untrusted useFilterCNF uptime uris +urls useFieldDiscovery v1 v2 From a29f73ce4a5ba68ee1cc6d6498788a28cc2b8443 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Fri, 26 Feb 2021 13:48:47 +0000 Subject: [PATCH 5/9] Use non-deprecated getSchemaById() and remove duplicated license entry --- .../avro/SchemaRegistryBasedAvroBytesDecoder.java | 2 +- licenses.yaml | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 3bdbc3f6d8b0..783a9a6b71fe 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -73,7 +73,7 @@ public GenericRecord parse(ByteBuffer bytes) int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getById(id); + Schema schema = registry.getSchemaById(id).rawSchema(); DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } diff --git a/licenses.yaml b/licenses.yaml index 8c4d166ddb16..0a1279eb1a67 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3393,16 +3393,6 @@ libraries: --- -name: com.101tec zkclient -license_category: binary -version: '0.10' -module: druid-ranger-security -license_name: Apache License version 2.0 -libraries: - - com.101tec: zkclient - ---- - name: Kafka Client version: 5.5.1-ccs license_category: binary From 2925492a7d55c7bb1ade4c56145727fc8584fd9c Mon Sep 17 00:00:00 2001 From: spinatelli Date: Fri, 26 Feb 2021 14:49:19 +0100 Subject: [PATCH 6/9] Update docs/ingestion/data-formats.md Co-authored-by: Clint Wylie --- docs/ingestion/data-formats.md | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 3cad2cc1af26..96f5d924b757 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -1036,19 +1036,23 @@ Multiple Instances: ```json ... "avroBytesDecoder" : { - "type" : "schema_registry", - "urls" : [, , ...], + "type" : "schema_registry", + "urls" : [, , ...], "config" : { - "schema.registry.basic.auth.credentials.source" : "USER_INFO", - "schema.registry.basic.auth.user.info" : "fred:letmein", - ... - }, + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "" + ... + }, "headers": { - "traceID" : "b29c5de2-0db4-490b-b421", - "timeStamp" : "1577191871865", - ... - - } + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } } ... ``` From 26bffa3f2ec8bfba4038e64522f017abfec8bea5 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Fri, 26 Feb 2021 14:25:36 +0000 Subject: [PATCH 7/9] Added check for schema being null, as per Confluent code --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 783a9a6b71fe..05218015d1e7 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -73,7 +73,8 @@ public GenericRecord parse(ByteBuffer bytes) int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getSchemaById(id).rawSchema(); + ParsedSchema parsedSchema = registry.getSchemaById(id); + Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema)parsedSchema).rawSchema() : null; DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } From a4cbfe2d778797aa849a2109cabc96d14cf8bfe7 Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Fri, 26 Feb 2021 17:08:44 +0000 Subject: [PATCH 8/9] Missing imports and whitespace --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 05218015d1e7..42765fa67787 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; @@ -74,7 +76,7 @@ public GenericRecord parse(ByteBuffer bytes) int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); ParsedSchema parsedSchema = registry.getSchemaById(id); - Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema)parsedSchema).rawSchema() : null; + Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } From f77bc36675737459a425efc934e693af42295fee Mon Sep 17 00:00:00 2001 From: Sergio Spinatelli Date: Sat, 27 Feb 2021 18:11:29 +0000 Subject: [PATCH 9/9] Updated unit tests with AvroSchema --- .../avro/SchemaRegistryBasedAvroBytesDecoderTest.java | 7 ++++--- integration-tests/pom.xml | 4 ++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 44d87be6a1d7..55c7e6bd4528 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.avro; import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -95,7 +96,7 @@ public void testConfig() throws Exception public void testParse() throws Exception { // Given - Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -111,7 +112,7 @@ public void testParse() throws Exception public void testParseCorrupted() throws Exception { // Given - Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -124,7 +125,7 @@ public void testParseCorrupted() throws Exception public void testParseWrongId() throws Exception { // Given - Mockito.when(registry.getById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3664f3473d98..fc3b7b50eecb 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -240,6 +240,10 @@ kafka-clients ${apache.kafka.version} + + javax.ws.rs + jsr311-api + io.netty netty