Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class DynamicConfigProviderUtils
{
Expand Down Expand Up @@ -62,7 +63,29 @@ public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object>
return newConfig;
}

private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
public static void extraConfigAndSetProperty(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties)
{
if (config != null) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (!dynamicConfigProviderKey.equals(entry.getKey())) {
properties.setProperty(entry.getKey(), String.valueOf(entry.getValue()));
}
}
extraDynamicConfigAndSetProperty(config, dynamicConfigProviderKey, mapper, properties);
}
}

public static void extraDynamicConfigAndSetProperty(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper, Properties properties)
{
if (config != null) {
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
}

public static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
{
if (dynamicConfigProviderJson != null) {
DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.runner.RunWith;

import java.util.Map;
import java.util.Properties;

@RunWith(Enclosed.class)
public class DynamicConfigProviderUtilsTest
Expand Down Expand Up @@ -80,5 +81,42 @@ public void testExtraConfigAndSetObjectMap()
Assert.assertEquals("value1", res.get("prop1").toString());
Assert.assertEquals("value2", res.get("prop2").toString());
}

@Test
public void testExtraConfigAndSetProperty()
{
Properties props = new Properties();
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("prop2", "value2")
);

Map<String, Object> properties = ImmutableMap.of(
"prop1", "value1",
"prop2", "value3",
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
);
DynamicConfigProviderUtils.extraConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props);

Assert.assertEquals(2, props.size());
Assert.assertEquals("value1", props.get("prop1").toString());
Assert.assertEquals("value2", props.get("prop2").toString());
}

@Test
public void testExtraDynamicConfigAndSetProperty()
{
Properties props = new Properties();
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("prop1", "value1")
);

Map<String, Object> properties = ImmutableMap.of(
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
);
DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER, props);

Assert.assertEquals(1, props.size());
Assert.assertEquals("value1", props.get("prop1").toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -70,8 +71,8 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
this.jsonMapper = jsonMapper;
this.producer = setKafkaProducer();
// same with kafka producer's buffer.memory
long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig()
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
long queueMemoryBound = Long.parseLong(String.valueOf(this.config.getKafkaProducerConfig()
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
Expand Down Expand Up @@ -104,8 +105,7 @@ protected Producer<String, String> setKafkaProducer()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
props.putAll(config.getKafkaProducerConfig());

DynamicConfigProviderUtils.extraConfigAndSetProperty(config.getKafkaProducerConfig(), KafkaEmitterConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, jsonMapper, props);
return new KafkaProducer<>(props);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,25 @@
import org.apache.kafka.clients.producer.ProducerConfig;

import javax.annotation.Nullable;

import java.util.Map;

public class KafkaEmitterConfig
{

public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
@JsonProperty("metric.topic")
private final String metricTopic;
@JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
@Nullable
@JsonProperty("request.topic")
private final String requestTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
private Map<String, String> kafkaProducerConfig;
private Map<String, Object> kafkaProducerConfig;

@JsonCreator
public KafkaEmitterConfig(
Expand All @@ -51,7 +53,7 @@ public KafkaEmitterConfig(
@JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
@JsonProperty("producer.config") @Nullable Map<String, Object> kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
Expand Down Expand Up @@ -93,7 +95,7 @@ public String getRequestTopic()
}

@JsonProperty
public Map<String, String> getKafkaProducerConfig()
public Map<String, Object> getKafkaProducerConfig()
{
return kafkaProducerConfig;
}
Expand Down Expand Up @@ -127,6 +129,7 @@ public boolean equals(Object o)
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}

return getKafkaProducerConfig().equals(that.getKafkaProducerConfig());
}

Expand All @@ -146,12 +149,12 @@ public int hashCode()
public String toString()
{
return "KafkaEmitterConfig{" +
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

public class KafkaEmitterConfigTest
{
Expand All @@ -42,25 +45,45 @@ public void setUp()
@Test
public void testSerDeserKafkaEmitterConfig() throws IOException
{
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("testKey2", "testValue2")
);
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", "requestTest",
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
"clusterNameTest", ImmutableMap.<String, Object>builder()
.put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class)
).build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeserKafkaEmitterConfigNullDynamicConfigProvider() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", "requestTest",
"clusterNameTest", ImmutableMap.<String, Object>builder()
.put("testKey", "testValue").build());
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
{
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("testKey2", "testValue2")
);
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", null,
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
"clusterNameTest", ImmutableMap.<String, Object>builder()
.put("testKey", "testValue").put("druid.dynamic.config.provider", mapper.convertValue(dynamicConfigProvider, Map.class)
).build());
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Expand All @@ -72,8 +95,7 @@ public void testSerDeNotRequiredKafkaProducerConfig()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
"alertTest", null,
"clusterNameTest", null
);
"clusterNameTest", null);
try {
@SuppressWarnings("unused")
KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -214,15 +214,8 @@ public static void addConsumerPropertiesFromConfig(
}

// Additional DynamicConfigProvider based extensible support for all consumer properties
Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY);
if (dynamicConfigProviderJson != null) {
DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();
DynamicConfigProviderUtils.extraDynamicConfigAndSetProperty(consumerProperties, KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, configMapper, properties);

for (Map.Entry<String, String> e : dynamicConfig.entrySet()) {
properties.setProperty(e.getKey(), e.getValue());
}
}
}

private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
Expand Down