From b685bca945fab9fe566b5f2ed8152a10d828fc0f Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 13 Jun 2021 17:58:02 +0800 Subject: [PATCH 01/10] add_DynamicConfigProvider_for_schema_registry --- .../SchemaRegistryBasedAvroBytesDecoder.java | 69 ++++++++++++++++--- ...hemaRegistryBasedAvroBytesDecoderTest.java | 40 +++++++++++ ...hemaRegistryBasedProtobufBytesDecoder.java | 68 +++++++++++++++--- ...RegistryBasedProtobufBytesDecoderTest.java | 40 +++++++++++ 4 files changed, 201 insertions(+), 16 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 4b3da38c49c9..289bd5e9b998 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; @@ -34,10 +35,13 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.metadata.DynamicConfigProvider; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,16 +52,18 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder private final String url; private final int capacity; private final List urls; - private final Map config; - private final Map headers; + private final Map config; + private final Map headers; + private final ObjectMapper mapper; + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("url") @Deprecated String url, @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, - @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { this.url = url; @@ -65,10 +71,11 @@ public SchemaRegistryBasedAvroBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; + this.mapper = new ObjectMapper(); if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, createRegistryConfig(), createRegistryHeader()); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, createRegistryConfig(), createRegistryHeader()); } } @@ -91,17 +98,62 @@ public List getUrls() } @JsonProperty - public Map getConfig() + public Map getConfig() { return config; } @JsonProperty - public Map getHeaders() + public Map getHeaders() { return headers; } + protected Map createRegistryHeader() + { + HashMap registryHeader = new HashMap<>(); + if (headers != null) { + for (String key : headers.keySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { + registryHeader.put(key, headers.get(key).toString()); + } + } + Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); + for (String key : dynamicConfig.keySet()) { + registryHeader.put(key, dynamicConfig.get(key)); + } + } + return registryHeader; + } + + protected Map createRegistryConfig() + { + HashMap registryConfig = new HashMap<>(); + if (config != null) { + for (String key : config.keySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { + registryConfig.put(key, config.get(key)); + } + } + if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { + Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); + for (String key : dynamicConfig.keySet()) { + registryConfig.put(key, dynamicConfig.get(key)); + } + } + } + return registryConfig; + } + + private Map extraConfigFromProvider(Object dynamicConfigProviderJson) + { + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + return dynamicConfigProvider.getConfig(); + } + return Collections.emptyMap(); + } + //For UT only @VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) @@ -112,6 +164,7 @@ public Map getHeaders() this.config = null; this.headers = null; this.registry = registry; + this.mapper = new ObjectMapper(); } @Override diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 3eb643934fec..34ebb2b65f6a 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; @@ -41,6 +42,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; public class SchemaRegistryBasedAvroBytesDecoderTest { @@ -163,4 +165,42 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); return out.toByteArray(); } + + @Test + public void testParseHeader() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + Map heaeder = decoder.createRegistryHeader(); + + // Then + Assert.assertEquals(3, heaeder.size()); + Assert.assertEquals("value.1", heaeder.get("registry.header.prop.1")); + Assert.assertEquals("value.2", heaeder.get("registry.header.prop.2")); + Assert.assertEquals("value.3", heaeder.get("registry.header.prop.3")); + } + + @Test + public void testParseConfig() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + Map heaeder = decoder.createRegistryConfig(); + + // Then + Assert.assertEquals(3, heaeder.size()); + Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1")); + Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2")); + Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3")); + } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 2d4cc8dd5556..1150680657f8 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -31,12 +32,14 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.metadata.DynamicConfigProvider; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,16 +53,18 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private final String url; private final int capacity; private final List urls; - private final Map config; - private final Map headers; + private final Map config; + private final Map headers; + private final ObjectMapper mapper; + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( @JsonProperty("url") @Deprecated String url, @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, - @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { this.url = url; @@ -67,10 +72,11 @@ public SchemaRegistryBasedProtobufBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; + this.mapper = new ObjectMapper(); if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), createRegistryConfig(), createRegistryHeader()); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), createRegistryConfig(), createRegistryHeader()); } } @@ -93,13 +99,13 @@ public List getUrls() } @JsonProperty - public Map getConfig() + public Map getConfig() { return config; } @JsonProperty - public Map getHeaders() + public Map getHeaders() { return headers; } @@ -119,6 +125,52 @@ int getIdentityMapCapacity() this.config = null; this.headers = null; this.registry = registry; + this.mapper = new ObjectMapper(); + } + + protected Map createRegistryHeader() + { + HashMap registryHeader = new HashMap<>(); + if (headers != null) { + for (String key : headers.keySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { + registryHeader.put(key, headers.get(key).toString()); + } + } + Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); + for (String key : dynamicConfig.keySet()) { + registryHeader.put(key, dynamicConfig.get(key)); + } + } + return registryHeader; + } + + protected Map createRegistryConfig() + { + HashMap registryConfig = new HashMap<>(); + if (config != null) { + for (String key : config.keySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { + registryConfig.put(key, config.get(key)); + } + } + if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { + Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); + for (String key : dynamicConfig.keySet()) { + registryConfig.put(key, dynamicConfig.get(key)); + } + } + } + return registryConfig; + } + + private Map extraConfigFromProvider(Object dynamicConfigProviderJson) + { + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + return dynamicConfigProvider.getConfig(); + } + return Collections.emptyMap(); } @Override diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index 0d77b11ec4a2..05ae7057bc40 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.protobuf.DynamicMessage; @@ -40,6 +41,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Map; public class SchemaRegistryBasedProtobufBytesDecoderTest { @@ -158,6 +160,44 @@ public void testConfig() throws Exception Assert.assertNotEquals(decoder.hashCode(), 0); } + @Test + public void testParseHeader() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + Map heaeder = decoder.createRegistryHeader(); + + // Then + Assert.assertEquals(3, heaeder.size()); + Assert.assertEquals("value.1", heaeder.get("registry.header.prop.1")); + Assert.assertEquals("value.2", heaeder.get("registry.header.prop.2")); + Assert.assertEquals("value.3", heaeder.get("registry.header.prop.3")); + } + + @Test + public void testParseConfig() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + Map heaeder = decoder.createRegistryConfig(); + + // Then + Assert.assertEquals(3, heaeder.size()); + Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1")); + Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2")); + Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3")); + } + private ProtobufSchema parseProtobufSchema() throws IOException { // Given From 9acc33a78db86210ebba008ca43c5898dba2a06b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Fri, 18 Jun 2021 15:13:30 +0800 Subject: [PATCH 02/10] bug fixed --- .../avro/SchemaRegistryBasedAvroBytesDecoder.java | 10 +++++----- .../SchemaRegistryBasedProtobufBytesDecoder.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 289bd5e9b998..c2dfaf34c56b 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -130,15 +130,15 @@ protected Map createRegistryConfig() { HashMap registryConfig = new HashMap<>(); if (config != null) { - for (String key : config.keySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { - registryConfig.put(key, config.get(key)); + for (Map.Entry entry : config.entrySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { + registryConfig.put(entry.getKey(), entry.getValue()); } } if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (String key : dynamicConfig.keySet()) { - registryConfig.put(key, dynamicConfig.get(key)); + for (Map.Entry entry : dynamicConfig.entrySet()) { + registryConfig.put(entry.getKey(), entry.getValue()); } } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 1150680657f8..a3d32ffc85f1 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -149,15 +149,15 @@ protected Map createRegistryConfig() { HashMap registryConfig = new HashMap<>(); if (config != null) { - for (String key : config.keySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { - registryConfig.put(key, config.get(key)); + for (Map.Entry entry : config.entrySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { + registryConfig.put(entry.getKey(), entry.getValue()); } } if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (String key : dynamicConfig.keySet()) { - registryConfig.put(key, dynamicConfig.get(key)); + for (Map.Entry entry : dynamicConfig.entrySet()) { + registryConfig.put(entry.getKey(), entry.getValue()); } } } From eaf1cf676849423693f48f4814f4ded81809b307 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 22 Jun 2021 16:27:54 +0800 Subject: [PATCH 03/10] add document --- docs/development/extensions-core/avro.md | 2 +- docs/ingestion/data-formats.md | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md index 75704cfcc0c9..1727501bbc46 100644 --- a/docs/development/extensions-core/avro.md +++ b/docs/development/extensions-core/avro.md @@ -26,7 +26,7 @@ title: "Apache Avro" This Apache Druid extension enables Druid to ingest and understand the Apache Avro data format. This extension provides two Avro Parsers for stream ingestion and Hadoop batch ingestion. -See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser) +See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/c.md#avro-stream-parser) for more details about how to use these in an ingestion spec. Additionally, it provides an InputFormat for reading Avro OCF files when using diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index d431fead16e1..ef1c9d02ef9e 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | +| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | +| headers | Json | To send headers to the Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -1223,8 +1223,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | +| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | +| headers | Json | To send headers to the Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. From 42c1f168c50bf0f846bd60b03d6a601bbb0f0e0b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 22 Jun 2021 20:00:52 +0800 Subject: [PATCH 04/10] fix document --- docs/development/extensions-core/avro.md | 2 +- docs/ingestion/data-formats.md | 25 +++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/docs/development/extensions-core/avro.md b/docs/development/extensions-core/avro.md index 1727501bbc46..75704cfcc0c9 100644 --- a/docs/development/extensions-core/avro.md +++ b/docs/development/extensions-core/avro.md @@ -26,7 +26,7 @@ title: "Apache Avro" This Apache Druid extension enables Druid to ingest and understand the Apache Avro data format. This extension provides two Avro Parsers for stream ingestion and Hadoop batch ingestion. -See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/c.md#avro-stream-parser) +See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser) for more details about how to use these in an ingestion spec. Additionally, it provides an InputFormat for reading Avro OCF files when using diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index ef1c9d02ef9e..16384ec1910a 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | -| headers | Json | To send headers to the Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -408,12 +408,20 @@ Multiple Instances: "schema.registry.ssl.truststore.password": "", "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", "schema.registry.ssl.keystore.password": "", - "schema.registry.ssl.key.password": "" + "schema.registry.ssl.key.password": "", + "schema.registry.ssl.key.password", ... }, "headers": { "traceID" : "b29c5de2-0db4-490b-b421", "timeStamp" : "1577191871865", + "druid.dynamic.config.provider":{ + "type":"mapString", + "config":{ + "registry.header.prop.1":"value.1", + "registry.header.prop.2":"value.2" + } + } ... } } @@ -1223,8 +1231,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | -| headers | Json | To send headers to the Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "", ...}` in json. | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md). | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -1259,6 +1267,13 @@ Multiple Instances: "headers": { "traceID" : "b29c5de2-0db4-490b-b421", "timeStamp" : "1577191871865", + "druid.dynamic.config.provider":{ + "type":"mapString", + "config":{ + "registry.header.prop.1":"value.1", + "registry.header.prop.2":"value.2" + } + } ... } } From ba11f172df0e9ef28d745fcb7914774f7fd0af30 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 22 Jun 2021 20:33:04 +0800 Subject: [PATCH 05/10] fix spot bug --- .../avro/SchemaRegistryBasedAvroBytesDecoder.java | 10 +++++----- .../SchemaRegistryBasedProtobufBytesDecoder.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index c2dfaf34c56b..f5c311d75b26 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -113,14 +113,14 @@ protected Map createRegistryHeader() { HashMap registryHeader = new HashMap<>(); if (headers != null) { - for (String key : headers.keySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { - registryHeader.put(key, headers.get(key).toString()); + for (Map.Entry entry : headers.entrySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { + registryHeader.put(entry.getKey(), entry.getValue().toString()); } } Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (String key : dynamicConfig.keySet()) { - registryHeader.put(key, dynamicConfig.get(key)); + for (Map.Entry entry : dynamicConfig.entrySet()) { + registryHeader.put(entry.getKey(), entry.getValue()); } } return registryHeader; diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index a3d32ffc85f1..212609d2c3ba 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -132,14 +132,14 @@ protected Map createRegistryHeader() { HashMap registryHeader = new HashMap<>(); if (headers != null) { - for (String key : headers.keySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) { - registryHeader.put(key, headers.get(key).toString()); + for (Map.Entry entry : headers.entrySet()) { + if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { + registryHeader.put(entry.getKey(), entry.getValue().toString()); } } Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (String key : dynamicConfig.keySet()) { - registryHeader.put(key, dynamicConfig.get(key)); + for (Map.Entry entry : dynamicConfig.entrySet()) { + registryHeader.put(entry.getKey(), entry.getValue()); } } return registryHeader; From 7adb0fa289d8860d1ff5b93a72e7d52f9c7f3a7f Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 23 Jun 2021 13:16:16 +0800 Subject: [PATCH 06/10] fix document --- docs/ingestion/data-formats.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 16384ec1910a..2cdf056f89b8 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | -| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -1231,8 +1231,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md). | no | -| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md) | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md). | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. From bf178728352d0244525155da8c6bb4001e0f8183 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 23 Jun 2021 14:00:50 +0800 Subject: [PATCH 07/10] inject ObjectMapper --- .../SchemaRegistryBasedAvroBytesDecoder.java | 13 +++++--- .../data/input/AvroStreamInputFormatTest.java | 6 +++- ...hemaRegistryBasedAvroBytesDecoderTest.java | 27 +++++++++++++--- ...hemaRegistryBasedProtobufBytesDecoder.java | 13 +++++--- .../protobuf/ProtobufInputFormatTest.java | 6 +++- ...RegistryBasedProtobufBytesDecoderTest.java | 31 ++++++++++++++----- 6 files changed, 72 insertions(+), 24 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index f5c311d75b26..a36b2d6bfd9e 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,6 +34,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.metadata.DynamicConfigProvider; @@ -54,7 +56,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder private final List urls; private final Map config; private final Map headers; - private final ObjectMapper mapper; + private final ObjectMapper jsonMapper; public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator @@ -63,7 +65,8 @@ public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("headers") @Nullable Map headers, + @JacksonInject @Json ObjectMapper jsonMapper ) { this.url = url; @@ -71,7 +74,7 @@ public SchemaRegistryBasedAvroBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; - this.mapper = new ObjectMapper(); + this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, createRegistryConfig(), createRegistryHeader()); } else { @@ -148,7 +151,7 @@ protected Map createRegistryConfig() private Map extraConfigFromProvider(Object dynamicConfigProviderJson) { if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + DynamicConfigProvider dynamicConfigProvider = jsonMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); return dynamicConfigProvider.getConfig(); } return Collections.emptyMap(); @@ -164,7 +167,7 @@ private Map extraConfigFromProvider(Object dynamicConfigProvider this.config = null; this.headers = null; this.registry = registry; - this.mapper = new ObjectMapper(); + this.jsonMapper = new ObjectMapper(); } @Override diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index 918aa0abb513..d5ec4ea10870 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -109,6 +110,9 @@ public void before() for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); } @Test @@ -134,7 +138,7 @@ public void testSerdeForSchemaRegistry() throws IOException { AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( flattenSpec, - new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null), + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null), false, false ); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 34ebb2b65f6a..5d457de74552 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.avro; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; @@ -31,6 +32,7 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; @@ -58,7 +60,10 @@ public void setUp() public void testMultipleUrls() throws Exception { String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -72,7 +77,10 @@ public void testMultipleUrls() throws Exception public void testUrl() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -86,7 +94,10 @@ public void testUrl() throws Exception public void testConfig() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -170,7 +181,10 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I public void testParseHeader() throws JsonProcessingException { String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -189,7 +203,10 @@ public void testParseHeader() throws JsonProcessingException public void testParseConfig() throws JsonProcessingException { String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 212609d2c3ba..7301c7a54d0a 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -30,6 +31,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.metadata.DynamicConfigProvider; @@ -55,7 +57,7 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private final List urls; private final Map config; private final Map headers; - private final ObjectMapper mapper; + private final ObjectMapper jsonMapper; public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator @@ -64,7 +66,8 @@ public SchemaRegistryBasedProtobufBytesDecoder( @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("headers") @Nullable Map headers, + @JacksonInject @Json ObjectMapper jsonMapper ) { this.url = url; @@ -72,7 +75,7 @@ public SchemaRegistryBasedProtobufBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; - this.mapper = new ObjectMapper(); + this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), createRegistryConfig(), createRegistryHeader()); } else { @@ -125,7 +128,7 @@ int getIdentityMapCapacity() this.config = null; this.headers = null; this.registry = registry; - this.mapper = new ObjectMapper(); + this.jsonMapper = new ObjectMapper(); } protected Map createRegistryHeader() @@ -167,7 +170,7 @@ protected Map createRegistryConfig() private Map extraConfigFromProvider(Object dynamicConfigProviderJson) { if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + DynamicConfigProvider dynamicConfigProvider = jsonMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); return dynamicConfigProvider.getConfig(); } return Collections.emptyMap(); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index e9e15ffe2607..0a3e8c7f7d54 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; @@ -77,6 +78,9 @@ public void setUp() for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); } @Test @@ -99,7 +103,7 @@ public void testSerdeForSchemaRegistry() throws IOException { ProtobufInputFormat inputFormat = new ProtobufInputFormat( flattenSpec, - new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null) + new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null, null) ); NestedInputFormat inputFormat2 = jsonMapper.readValue( jsonMapper.writeValueAsString(inputFormat), diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index 05ae7057bc40..9e3122b7848f 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.protobuf; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.protobuf.DynamicMessage; @@ -27,6 +28,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.apache.commons.io.IOUtils; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -95,7 +97,7 @@ public void testParseWrongId() throws Exception public void testDefaultCapacity() { // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE); } @@ -105,7 +107,7 @@ public void testGivenCapacity() { int capacity = 100; // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity); } @@ -122,7 +124,10 @@ private ProtoTestEventWrapper.ProtoTestEvent getTestEvent() public void testMultipleUrls() throws Exception { String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -136,7 +141,10 @@ public void testMultipleUrls() throws Exception public void testUrl() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -150,7 +158,10 @@ public void testUrl() throws Exception public void testConfig() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -164,7 +175,10 @@ public void testConfig() throws Exception public void testParseHeader() throws JsonProcessingException { String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -183,7 +197,10 @@ public void testParseHeader() throws JsonProcessingException public void testParseConfig() throws JsonProcessingException { String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) From 254f8e7566b159aefa462d4c40268248fb3d94d2 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 23 Jun 2021 15:10:34 +0800 Subject: [PATCH 08/10] add DynamicConfigProviderUtils --- .../utils/DynamicConfigProviderUtils.java | 73 +++++++++++++++++++ .../SchemaRegistryBasedAvroBytesDecoder.java | 5 +- ...hemaRegistryBasedAvroBytesDecoderTest.java | 13 ++-- ...hemaRegistryBasedProtobufBytesDecoder.java | 52 +------------ ...RegistryBasedProtobufBytesDecoderTest.java | 5 +- 5 files changed, 89 insertions(+), 59 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java new file mode 100644 index 000000000000..4c45262aba0d --- /dev/null +++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class DynamicConfigProviderUtils +{ + public static Map extraConfigAndSetStringMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + { + HashMap newConfig = new HashMap<>(); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + newConfig.put(entry.getKey(), entry.getValue().toString()); + } + } + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + return newConfig; + } + + public static Map extraConfigAndSetObjectMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + { + HashMap newConfig = new HashMap<>(); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + return newConfig; + } + + private static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + { + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + return dynamicConfigProvider.getConfig(); + } + return Collections.emptyMap(); + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index a36b2d6bfd9e..9b4ca6254281 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.utils.DynamicConfigProviderUtils; import javax.annotation.Nullable; import java.io.IOException; @@ -76,9 +77,9 @@ public SchemaRegistryBasedAvroBytesDecoder( this.headers = headers; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, createRegistryConfig(), createRegistryHeader()); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, createRegistryConfig(), createRegistryHeader()); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 5d457de74552..90e0baf89af6 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -35,6 +35,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -190,7 +191,7 @@ public void testParseHeader() throws JsonProcessingException .readerFor(AvroBytesDecoder.class) .readValue(json); - Map heaeder = decoder.createRegistryHeader(); + Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then Assert.assertEquals(3, heaeder.size()); @@ -212,12 +213,12 @@ public void testParseConfig() throws JsonProcessingException .readerFor(AvroBytesDecoder.class) .readValue(json); - Map heaeder = decoder.createRegistryConfig(); + Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then - Assert.assertEquals(3, heaeder.size()); - Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1")); - Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2")); - Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3")); + Assert.assertEquals(3, config.size()); + Assert.assertEquals("value.1", config.get("registry.config.prop.1")); + Assert.assertEquals("value.2", config.get("registry.config.prop.2")); + Assert.assertEquals("value.3", config.get("registry.config.prop.3")); } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 7301c7a54d0a..0927be720687 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -34,14 +34,13 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.utils.DynamicConfigProviderUtils; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -77,9 +76,9 @@ public SchemaRegistryBasedProtobufBytesDecoder( this.headers = headers; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), createRegistryConfig(), createRegistryHeader()); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), createRegistryConfig(), createRegistryHeader()); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); } } @@ -131,51 +130,6 @@ int getIdentityMapCapacity() this.jsonMapper = new ObjectMapper(); } - protected Map createRegistryHeader() - { - HashMap registryHeader = new HashMap<>(); - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { - registryHeader.put(entry.getKey(), entry.getValue().toString()); - } - } - Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (Map.Entry entry : dynamicConfig.entrySet()) { - registryHeader.put(entry.getKey(), entry.getValue()); - } - } - return registryHeader; - } - - protected Map createRegistryConfig() - { - HashMap registryConfig = new HashMap<>(); - if (config != null) { - for (Map.Entry entry : config.entrySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { - registryConfig.put(entry.getKey(), entry.getValue()); - } - } - if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { - Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (Map.Entry entry : dynamicConfig.entrySet()) { - registryConfig.put(entry.getKey(), entry.getValue()); - } - } - } - return registryConfig; - } - - private Map extraConfigFromProvider(Object dynamicConfigProviderJson) - { - if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = jsonMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); - return dynamicConfigProvider.getConfig(); - } - return Collections.emptyMap(); - } - @Override public DynamicMessage parse(ByteBuffer bytes) { diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index 9e3122b7848f..dc36e3a9eb9e 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import org.junit.Assert; @@ -184,7 +185,7 @@ public void testParseHeader() throws JsonProcessingException .readerFor(ProtobufBytesDecoder.class) .readValue(json); - Map heaeder = decoder.createRegistryHeader(); + Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then Assert.assertEquals(3, heaeder.size()); @@ -206,7 +207,7 @@ public void testParseConfig() throws JsonProcessingException .readerFor(ProtobufBytesDecoder.class) .readValue(json); - Map heaeder = decoder.createRegistryConfig(); + Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then Assert.assertEquals(3, heaeder.size()); From 607740b67607445db9802f2c347fd7179d5777ac Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 26 Jun 2021 13:20:00 +0800 Subject: [PATCH 09/10] add UT --- .../utils/DynamicConfigProviderUtilsTest.java | 84 +++++++++++++++++++ .../SchemaRegistryBasedAvroBytesDecoder.java | 48 ----------- ...hemaRegistryBasedAvroBytesDecoderTest.java | 10 +-- ...RegistryBasedProtobufBytesDecoderTest.java | 10 +-- 4 files changed, 94 insertions(+), 58 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java new file mode 100644 index 000000000000..496acfabef3b --- /dev/null +++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import java.util.Map; + +@RunWith(Enclosed.class) +public class DynamicConfigProviderUtilsTest +{ + public static class ThrowIfURLHasNotAllowedPropertiesTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final String DYNAMIC_CONFIG_PROVIDER = "druid.dynamic.config.provider"; + + @Test + public void testExtraConfigAndSetStringMap() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + Map res = DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER); + + Assert.assertEquals(2, res.size()); + Assert.assertEquals("value1", res.get("prop1")); + Assert.assertEquals("value2", res.get("prop2")); + } + + @Test + public void testExtraConfigAndSetObjectMap() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + Map res = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER); + + Assert.assertEquals(2, res.size()); + Assert.assertEquals("value1", res.get("prop1").toString()); + Assert.assertEquals("value2", res.get("prop2").toString()); + } + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 9b4ca6254281..48d3e9ff737f 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -37,14 +37,11 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.utils.DynamicConfigProviderUtils; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -113,51 +110,6 @@ public Map getHeaders() return headers; } - protected Map createRegistryHeader() - { - HashMap registryHeader = new HashMap<>(); - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { - registryHeader.put(entry.getKey(), entry.getValue().toString()); - } - } - Map dynamicConfig = extraConfigFromProvider(headers.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (Map.Entry entry : dynamicConfig.entrySet()) { - registryHeader.put(entry.getKey(), entry.getValue()); - } - } - return registryHeader; - } - - protected Map createRegistryConfig() - { - HashMap registryConfig = new HashMap<>(); - if (config != null) { - for (Map.Entry entry : config.entrySet()) { - if (!DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(entry.getKey())) { - registryConfig.put(entry.getKey(), entry.getValue()); - } - } - if (config.containsKey(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) { - Map dynamicConfig = extraConfigFromProvider(config.get(DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)); - for (Map.Entry entry : dynamicConfig.entrySet()) { - registryConfig.put(entry.getKey(), entry.getValue()); - } - } - } - return registryConfig; - } - - private Map extraConfigFromProvider(Object dynamicConfigProviderJson) - { - if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = jsonMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); - return dynamicConfigProvider.getConfig(); - } - return Collections.emptyMap(); - } - //For UT only @VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 90e0baf89af6..ec073c92750b 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -191,13 +191,13 @@ public void testParseHeader() throws JsonProcessingException .readerFor(AvroBytesDecoder.class) .readValue(json); - Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then - Assert.assertEquals(3, heaeder.size()); - Assert.assertEquals("value.1", heaeder.get("registry.header.prop.1")); - Assert.assertEquals("value.2", heaeder.get("registry.header.prop.2")); - Assert.assertEquals("value.3", heaeder.get("registry.header.prop.3")); + Assert.assertEquals(3, header.size()); + Assert.assertEquals("value.1", header.get("registry.header.prop.1")); + Assert.assertEquals("value.2", header.get("registry.header.prop.2")); + Assert.assertEquals("value.3", header.get("registry.header.prop.3")); } @Test diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index dc36e3a9eb9e..009b5a60b076 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -185,13 +185,13 @@ public void testParseHeader() throws JsonProcessingException .readerFor(ProtobufBytesDecoder.class) .readValue(json); - Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); // Then - Assert.assertEquals(3, heaeder.size()); - Assert.assertEquals("value.1", heaeder.get("registry.header.prop.1")); - Assert.assertEquals("value.2", heaeder.get("registry.header.prop.2")); - Assert.assertEquals("value.3", heaeder.get("registry.header.prop.3")); + Assert.assertEquals(3, header.size()); + Assert.assertEquals("value.1", header.get("registry.header.prop.1")); + Assert.assertEquals("value.2", header.get("registry.header.prop.2")); + Assert.assertEquals("value.3", header.get("registry.header.prop.3")); } @Test From 9a6d0bd898f48c2d4b0065d9ff15749a87a481b4 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 26 Jun 2021 13:24:14 +0800 Subject: [PATCH 10/10] bug fixed --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 4 ++-- .../protobuf/SchemaRegistryBasedProtobufBytesDecoder.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 48d3e9ff737f..59cb33e35f45 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -74,9 +74,9 @@ public SchemaRegistryBasedAvroBytesDecoder( this.headers = headers; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 0927be720687..17bb85a59ed9 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -76,9 +76,9 @@ public SchemaRegistryBasedProtobufBytesDecoder( this.headers = headers; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper)); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } }