diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java index b5ca3ce096de..2b73f59b6dcc 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.producer.ProducerConfig; +import javax.annotation.Nullable; import java.util.Map; public class KafkaEmitterConfig @@ -46,14 +48,14 @@ public KafkaEmitterConfig( @JsonProperty("metric.topic") String metricTopic, @JsonProperty("alert.topic") String alertTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") Map kafkaProducerConfig + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); this.clusterName = clusterName; - this.kafkaProducerConfig = kafkaProducerConfig; + this.kafkaProducerConfig = kafkaProducerConfig == null? ImmutableMap.of() : kafkaProducerConfig; } @JsonProperty @@ -110,9 +112,7 @@ public boolean equals(Object o) if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } - return getKafkaProducerConfig() != null - ? getKafkaProducerConfig().equals(that.getKafkaProducerConfig()) - : that.getKafkaProducerConfig() == null; + return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); } @Override @@ -122,7 +122,7 @@ public int hashCode() result = 31 * result + getMetricTopic().hashCode(); result = 31 * result + getAlertTopic().hashCode(); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); - result = 31 * result + (getKafkaProducerConfig() != null ? getKafkaProducerConfig().hashCode() : 0); + result = 31 * result + getKafkaProducerConfig().hashCode(); return result; } diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java index 37a8c1133486..3b91706fbf02 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -52,4 +52,19 @@ public void testSerDeserKafkaEmitterConfig() throws IOException .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } + + @Test + public void testSerDeNotRequiredKafkaProducerConfig() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", + "alertTest", "clusterNameTest", + null + ); + try { + KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); + } + catch (NullPointerException e) { + Assert.fail(); + } + } }