From 1ac68b5d043369e6a8752d709742bedbc8a4e011 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Fri, 18 Jun 2021 14:41:51 +0800 Subject: [PATCH 1/4] add_DynamicConfigProvider_for_kafka_emitter --- .../druid/emitter/kafka/KafkaEmitter.java | 8 +++++++- .../emitter/kafka/KafkaEmitterConfig.java | 18 +++++++++++++++++- .../emitter/kafka/KafkaEmitterConfigTest.java | 10 +++++++--- .../druid/emitter/kafka/KafkaEmitterTest.java | 2 +- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 09b8031e093d..2ade0aa7e30e 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -105,8 +105,14 @@ protected Producer setKafkaProducer() props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); + if (config.getKafkaProducerConfigProvider() != null) { + Map dynamicConfig = config.getKafkaProducerConfigProvider().getConfig(); + for (Map.Entry e : dynamicConfig.entrySet()) { + props.setProperty(e.getKey(), e.getValue()); + } + } - return new KafkaProducer<>(props); + return new KafkaProducer<>(props).; } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index ed7b9ea0e9d1..821dedca1774 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; @@ -43,6 +44,8 @@ public class KafkaEmitterConfig private final String clusterName; @JsonProperty("producer.config") private Map kafkaProducerConfig; + @JsonProperty("producer.dynamic.config") + private DynamicConfigProvider kafkaProducerConfigProvider; @JsonCreator public KafkaEmitterConfig( @@ -51,7 +54,8 @@ public KafkaEmitterConfig( @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig, + @JsonProperty("producer.dynamic.config") @Nullable DynamicConfigProvider kafkaProducerConfigProvider ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); @@ -60,6 +64,7 @@ public KafkaEmitterConfig( this.requestTopic = requestTopic; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; + this.kafkaProducerConfigProvider = kafkaProducerConfigProvider; } @JsonProperty @@ -98,6 +103,12 @@ public Map getKafkaProducerConfig() return kafkaProducerConfig; } + @JsonProperty + public DynamicConfigProvider getKafkaProducerConfigProvider() + { + return kafkaProducerConfigProvider; + } + @Override public boolean equals(Object o) { @@ -127,6 +138,10 @@ public boolean equals(Object o) if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } + + if (getKafkaProducerConfigProvider() != null ? !getKafkaProducerConfigProvider().getConfig().equals(that.getKafkaProducerConfigProvider().getConfig()) : that.getKafkaProducerConfigProvider() != null) { + return false; + } return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); } @@ -139,6 +154,7 @@ public int hashCode() result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); + result = 31 * result + getKafkaProducerConfigProvider().hashCode(); return result; } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 55ecdbaeb8a9..0f2e19f81931 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -45,7 +46,9 @@ public void testSerDeserKafkaEmitterConfig() throws IOException KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "requestTest", "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + .put("testKey", "testValue").build(), new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2") + ) ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) @@ -59,7 +62,8 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", null, "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + .put("testKey", "testValue").build(), new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2")) ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) @@ -72,7 +76,7 @@ public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", "alertTest", null, - "clusterNameTest", null + "clusterNameTest", null, null ); try { @SuppressWarnings("unused") diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 26d9701cf702..357cc55608a6 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -82,7 +82,7 @@ public void testKafkaEmitter() throws InterruptedException requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); final KafkaProducer producer = EasyMock.createStrictMock(KafkaProducer.class); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), + new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null, null), new ObjectMapper() ) { From f0c6806abfbae26e38d9004ed9573b7e0b24fa92 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Fri, 18 Jun 2021 14:51:04 +0800 Subject: [PATCH 2/4] bug fixed --- .../main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 2ade0aa7e30e..2e73bddd0811 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -112,7 +112,7 @@ protected Producer setKafkaProducer() } } - return new KafkaProducer<>(props).; + return new KafkaProducer<>(props); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); From 6d92e26a793882171555be17778a1770638d9236 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 19 Jun 2021 14:09:20 +0800 Subject: [PATCH 3/4] fix test --- .../emitter/kafka/KafkaEmitterConfigTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 0f2e19f81931..44a3a4f1d588 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -56,6 +56,20 @@ public void testSerDeserKafkaEmitterConfig() throws IOException Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } + @Test + public void testSerDeserKafkaEmitterConfigNullDynamicConfigProvider() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", "requestTest", + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build(), null + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { @@ -76,7 +90,8 @@ public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", "alertTest", null, - "clusterNameTest", null, null + "clusterNameTest", null, new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2")) ); try { @SuppressWarnings("unused") From 88bdad98f06810ccfea9e5eabdbb953fbc6d2b74 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 4 Aug 2021 19:51:48 +0800 Subject: [PATCH 4/4] add extraDynamicConfigAndSetProperty --- .../utils/DynamicConfigProviderUtils.java | 25 ++++++++++- .../utils/DynamicConfigProviderUtilsTest.java | 38 +++++++++++++++++ .../druid/emitter/kafka/KafkaEmitter.java | 14 ++----- .../emitter/kafka/KafkaEmitterConfig.java | 41 +++++++------------ .../emitter/kafka/KafkaEmitterConfigTest.java | 31 +++++++------- .../druid/emitter/kafka/KafkaEmitterTest.java | 2 +- .../indexing/kafka/KafkaRecordSupplier.java | 11 +---- 7 files changed, 100 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java index 4c45262aba0d..9c8ae6730b5f 100644 --- a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java +++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; public class DynamicConfigProviderUtils { @@ -62,7 +63,29 @@ public static Map extraConfigAndSetObjectMap(Map return newConfig; } - private static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + public static void extraConfigAndSetProperty(Map config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties) + { + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); + } + } + extraDynamicConfigAndSetProperty(config, dynamicConfigProviderKey, mapper, properties); + } + } + + public static void extraDynamicConfigAndSetProperty(Map config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties) + { + if (config != null) { + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + properties.setProperty(entry.getKey(), entry.getValue()); + } + } + } + + public static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) { if (dynamicConfigProviderJson != null) { DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java index 496acfabef3b..4e4c4ccd7cde 100644 --- a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java @@ -31,6 +31,7 @@ import org.junit.runner.RunWith; import java.util.Map; +import java.util.Properties; @RunWith(Enclosed.class) public class DynamicConfigProviderUtilsTest @@ -80,5 +81,42 @@ public void testExtraConfigAndSetObjectMap() Assert.assertEquals("value1", res.get("prop1").toString()); Assert.assertEquals("value2", res.get("prop2").toString()); } + + @Test + public void testExtraConfigAndSetProperty() + { + Properties props = new Properties(); + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + DynamicConfigProviderUtils.extraConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props); + + Assert.assertEquals(2, props.size()); + Assert.assertEquals("value1", props.get("prop1").toString()); + Assert.assertEquals("value2", props.get("prop2").toString()); + } + + @Test + public void testExtraDynamicConfigAndSetProperty() + { + Properties props = new Properties(); + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop1", "value1") + ); + + Map properties = ImmutableMap.of( + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props); + + Assert.assertEquals(1, props.size()); + Assert.assertEquals("value1", props.get("prop1").toString()); + } } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 2e73bddd0811..e0ca05c27018 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -70,8 +71,8 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) this.jsonMapper = jsonMapper; this.producer = setKafkaProducer(); // same with kafka producer's buffer.memory - long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() - .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); + long queueMemoryBound = Long.parseLong(String.valueOf(this.config.getKafkaProducerConfig() + .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"))); this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); @@ -104,14 +105,7 @@ protected Producer setKafkaProducer() props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); - props.putAll(config.getKafkaProducerConfig()); - if (config.getKafkaProducerConfigProvider() != null) { - Map dynamicConfig = config.getKafkaProducerConfigProvider().getConfig(); - for (Map.Entry e : dynamicConfig.entrySet()) { - props.setProperty(e.getKey(), e.getValue()); - } - } - + DynamicConfigProviderUtils.extraConfigAndSetProperty(config.getKafkaProducerConfig(), KafkaEmitterConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper, props); return new KafkaProducer<>(props); } finally { diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index 821dedca1774..ab2d5a6030a6 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -23,29 +23,28 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; + import java.util.Map; public class KafkaEmitterConfig { - + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) private final String bootstrapServers; @JsonProperty("metric.topic") private final String metricTopic; @JsonProperty("alert.topic") private final String alertTopic; - @Nullable @JsonProperty("request.topic") + @Nullable + @JsonProperty("request.topic") private final String requestTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") - private Map kafkaProducerConfig; - @JsonProperty("producer.dynamic.config") - private DynamicConfigProvider kafkaProducerConfigProvider; + private Map kafkaProducerConfig; @JsonCreator public KafkaEmitterConfig( @@ -54,8 +53,7 @@ public KafkaEmitterConfig( @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig, - @JsonProperty("producer.dynamic.config") @Nullable DynamicConfigProvider kafkaProducerConfigProvider + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); @@ -64,7 +62,6 @@ public KafkaEmitterConfig( this.requestTopic = requestTopic; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; - this.kafkaProducerConfigProvider = kafkaProducerConfigProvider; } @JsonProperty @@ -98,17 +95,11 @@ public String getRequestTopic() } @JsonProperty - public Map getKafkaProducerConfig() + public Map getKafkaProducerConfig() { return kafkaProducerConfig; } - @JsonProperty - public DynamicConfigProvider getKafkaProducerConfigProvider() - { - return kafkaProducerConfigProvider; - } - @Override public boolean equals(Object o) { @@ -139,9 +130,6 @@ public boolean equals(Object o) return false; } - if (getKafkaProducerConfigProvider() != null ? !getKafkaProducerConfigProvider().getConfig().equals(that.getKafkaProducerConfigProvider().getConfig()) : that.getKafkaProducerConfigProvider() != null) { - return false; - } return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); } @@ -154,7 +142,6 @@ public int hashCode() result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); - result = 31 * result + getKafkaProducerConfigProvider().hashCode(); return result; } @@ -162,12 +149,12 @@ public int hashCode() public String toString() { return "KafkaEmitterConfig{" + - "bootstrap.servers='" + bootstrapServers + '\'' + - ", metric.topic='" + metricTopic + '\'' + - ", alert.topic='" + alertTopic + '\'' + - ", request.topic='" + requestTopic + '\'' + - ", clusterName='" + clusterName + '\'' + - ", Producer.config=" + kafkaProducerConfig + - '}'; + "bootstrap.servers='" + bootstrapServers + '\'' + + ", metric.topic='" + metricTopic + '\'' + + ", alert.topic='" + alertTopic + '\'' + + ", request.topic='" + requestTopic + '\'' + + ", clusterName='" + clusterName + '\'' + + ", Producer.config=" + kafkaProducerConfig + + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 44a3a4f1d588..aae52245aa8d 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -23,12 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Map; public class KafkaEmitterConfigTest { @@ -43,12 +45,14 @@ public void setUp() @Test public void testSerDeserKafkaEmitterConfig() throws IOException { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2") + ); KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "requestTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build(), new MapStringDynamicConfigProvider( - ImmutableMap.of("testKey2", "testValue2") - ) + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class) + ).build() ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) @@ -61,9 +65,8 @@ public void testSerDeserKafkaEmitterConfigNullDynamicConfigProvider() throws IOE { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "requestTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build(), null - ); + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build()); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); @@ -73,12 +76,14 @@ public void testSerDeserKafkaEmitterConfigNullDynamicConfigProvider() throws IOE @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2") + ); KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", null, - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build(), new MapStringDynamicConfigProvider( - ImmutableMap.of("testKey2", "testValue2")) - ); + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class) + ).build()); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); @@ -90,9 +95,7 @@ public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", "alertTest", null, - "clusterNameTest", null, new MapStringDynamicConfigProvider( - ImmutableMap.of("testKey2", "testValue2")) - ); + "clusterNameTest", null); try { @SuppressWarnings("unused") KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 357cc55608a6..26d9701cf702 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -82,7 +82,7 @@ public void testKafkaEmitter() throws InterruptedException requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); final KafkaProducer producer = EasyMock.createStrictMock(KafkaProducer.class); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null, null), + new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), new ObjectMapper() ) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index fe32ffe5ffb6..7eeb2bad64d4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -31,8 +31,8 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -214,15 +214,8 @@ public static void addConsumerPropertiesFromConfig( } // Additional DynamicConfigProvider based extensible support for all consumer properties - Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); - if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); - Map dynamicConfig = dynamicConfigProvider.getConfig(); + DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(consumerProperties, KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, configMapper, properties); - for (Map.Entry e : dynamicConfig.entrySet()) { - properties.setProperty(e.getKey(), e.getValue()); - } - } } private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)