diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java index 4c45262aba0d..9c8ae6730b5f 100644 --- a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java +++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; public class DynamicConfigProviderUtils { @@ -62,7 +63,29 @@ public static Map extraConfigAndSetObjectMap(Map return newConfig; } - private static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + public static void extraConfigAndSetProperty(Map config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties) + { + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); + } + } + extraDynamicConfigAndSetProperty(config, dynamicConfigProviderKey, mapper, properties); + } + } + + public static void extraDynamicConfigAndSetProperty(Map config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties) + { + if (config != null) { + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + properties.setProperty(entry.getKey(), entry.getValue()); + } + } + } + + public static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) { if (dynamicConfigProviderJson != null) { DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java index 496acfabef3b..4e4c4ccd7cde 100644 --- a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java @@ -31,6 +31,7 @@ import org.junit.runner.RunWith; import java.util.Map; +import java.util.Properties; @RunWith(Enclosed.class) public class DynamicConfigProviderUtilsTest @@ -80,5 +81,42 @@ public void testExtraConfigAndSetObjectMap() Assert.assertEquals("value1", res.get("prop1").toString()); Assert.assertEquals("value2", res.get("prop2").toString()); } + + @Test + public void testExtraConfigAndSetProperty() + { + Properties props = new Properties(); + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + DynamicConfigProviderUtils.extraConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props); + + Assert.assertEquals(2, props.size()); + Assert.assertEquals("value1", props.get("prop1").toString()); + Assert.assertEquals("value2", props.get("prop2").toString()); + } + + @Test + public void testExtraDynamicConfigAndSetProperty() + { + Properties props = new Properties(); + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop1", "value1") + ); + + Map properties = ImmutableMap.of( + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props); + + Assert.assertEquals(1, props.size()); + Assert.assertEquals("value1", props.get("prop1").toString()); + } } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 09b8031e093d..e0ca05c27018 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -70,8 +71,8 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) this.jsonMapper = jsonMapper; this.producer = setKafkaProducer(); // same with kafka producer's buffer.memory - long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() - .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); + long queueMemoryBound = Long.parseLong(String.valueOf(this.config.getKafkaProducerConfig() + .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"))); this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); @@ -104,8 +105,7 @@ protected Producer setKafkaProducer() props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); - props.putAll(config.getKafkaProducerConfig()); - + DynamicConfigProviderUtils.extraConfigAndSetProperty(config.getKafkaProducerConfig(), KafkaEmitterConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper, props); return new KafkaProducer<>(props); } finally { diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index ed7b9ea0e9d1..ab2d5a6030a6 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -26,23 +26,25 @@ import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; + import java.util.Map; public class KafkaEmitterConfig { - + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) private final String bootstrapServers; @JsonProperty("metric.topic") private final String metricTopic; @JsonProperty("alert.topic") private final String alertTopic; - @Nullable @JsonProperty("request.topic") + @Nullable + @JsonProperty("request.topic") private final String requestTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") - private Map kafkaProducerConfig; + private Map kafkaProducerConfig; @JsonCreator public KafkaEmitterConfig( @@ -51,7 +53,7 @@ public KafkaEmitterConfig( @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); @@ -93,7 +95,7 @@ public String getRequestTopic() } @JsonProperty - public Map getKafkaProducerConfig() + public Map getKafkaProducerConfig() { return kafkaProducerConfig; } @@ -127,6 +129,7 @@ public boolean equals(Object o) if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } + return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); } @@ -146,12 +149,12 @@ public int hashCode() public String toString() { return "KafkaEmitterConfig{" + - "bootstrap.servers='" + bootstrapServers + '\'' + - ", metric.topic='" + metricTopic + '\'' + - ", alert.topic='" + alertTopic + '\'' + - ", request.topic='" + requestTopic + '\'' + - ", clusterName='" + clusterName + '\'' + - ", Producer.config=" + kafkaProducerConfig + - '}'; + "bootstrap.servers='" + bootstrapServers + '\'' + + ", metric.topic='" + metricTopic + '\'' + + ", alert.topic='" + alertTopic + '\'' + + ", request.topic='" + requestTopic + '\'' + + ", clusterName='" + clusterName + '\'' + + ", Producer.config=" + kafkaProducerConfig + + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 55ecdbaeb8a9..aae52245aa8d 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -23,11 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Map; public class KafkaEmitterConfigTest { @@ -42,10 +45,14 @@ public void setUp() @Test public void testSerDeserKafkaEmitterConfig() throws IOException { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2") + ); KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "requestTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class) + ).build() ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) @@ -53,14 +60,30 @@ public void testSerDeserKafkaEmitterConfig() throws IOException Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } + @Test + public void testSerDeserKafkaEmitterConfigNullDynamicConfigProvider() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", "requestTest", + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build()); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("testKey2", "testValue2") + ); KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", null, - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() - ); + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class) + ).build()); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) .readValue(kafkaEmitterConfigString); @@ -72,8 +95,7 @@ public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", "alertTest", null, - "clusterNameTest", null - ); + "clusterNameTest", null); try { @SuppressWarnings("unused") KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index fe32ffe5ffb6..7eeb2bad64d4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -31,8 +31,8 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -214,15 +214,8 @@ public static void addConsumerPropertiesFromConfig( } // Additional DynamicConfigProvider based extensible support for all consumer properties - Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); - if (dynamicConfigProviderJson != null) { - DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); - Map dynamicConfig = dynamicConfigProvider.getConfig(); + DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(consumerProperties, KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, configMapper, properties); - for (Map.Entry e : dynamicConfig.entrySet()) { - properties.setProperty(e.getKey(), e.getValue()); - } - } } private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)