From 6bc0c5ef37b5dcbc029ad07368fe1135934dd645 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Wed, 17 Mar 2021 15:38:24 +0530 Subject: [PATCH 1/8] request logs through kafka emitter --- .../extensions-contrib/kafka-emitter.md | 1 + extensions-contrib/kafka-emitter/pom.xml | 6 ++ .../druid/emitter/kafka/KafkaEmitter.java | 19 ++++- .../emitter/kafka/KafkaEmitterConfig.java | 17 ++++ .../emitter/kafka/KafkaEmitterConfigTest.java | 4 +- .../server/log/DefaultRequestLogEvent.java | 17 +++- .../log/DefaultRequestLogEventTest.java | 81 +++++++++++++++++++ 7 files changed, 141 insertions(+), 4 deletions(-) diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index 15c975bab4d8..cd3ac9f3a021 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| |`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| |`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| +|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to kafka topic.|no|none| |`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| |`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 4540cba87005..28ae860b7d81 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -45,6 +45,12 @@ ${project.parent.version} provided + + org.apache.druid + druid-server + ${project.parent.version} + provided + com.google.code.findbugs jsr305 diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index ceb21c3975c9..11584f8036da 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.log.RequestLogEvent; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -51,6 +52,7 @@ public class KafkaEmitter implements Emitter private static final int DEFAULT_RETRIES = 3; private final AtomicLong metricLost; private final AtomicLong alertLost; + private final AtomicLong requestLost; private final AtomicLong invalidLost; private final KafkaEmitterConfig config; @@ -58,6 +60,7 @@ public class KafkaEmitter implements Emitter private final ObjectMapper jsonMapper; private final MemoryBoundLinkedBlockingQueue metricQueue; private final MemoryBoundLinkedBlockingQueue alertQueue; + private final MemoryBoundLinkedBlockingQueue requestQueue; private final ScheduledExecutorService scheduler; public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) @@ -70,9 +73,11 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(3); + this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.scheduler = Executors.newScheduledThreadPool(4); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); + this.requestLost = new AtomicLong(0L); this.invalidLost = new AtomicLong(0L); } @@ -111,6 +116,9 @@ public void start() { scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS); scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS); + if (config.getRequestTopic() != null) { + scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS); + } scheduler.scheduleWithFixedDelay(() -> { log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", metricLost.get(), alertLost.get(), invalidLost.get()); @@ -128,6 +136,11 @@ private void sendAlertToKafka() sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost)); } + private void sendRequestToKafka() + { + sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost)); + } + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) { ObjectContainer objectToSend; @@ -166,6 +179,10 @@ public void emit(final Event event) if (!alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } + } else if (event instanceof RequestLogEvent && config.getRequestTopic() != null) { + if (!requestQueue.offer(objectContainer)) { + requestLost.incrementAndGet(); + } } else { invalidLost.incrementAndGet(); } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index fe71b21c4cd9..ed7b9ea0e9d1 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -37,6 +37,8 @@ public class KafkaEmitterConfig private final String metricTopic; @JsonProperty("alert.topic") private final String alertTopic; + @Nullable @JsonProperty("request.topic") + private final String requestTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") @@ -47,6 +49,7 @@ public KafkaEmitterConfig( @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, @JsonProperty("metric.topic") String metricTopic, @JsonProperty("alert.topic") String alertTopic, + @Nullable @JsonProperty("request.topic") String requestTopic, @JsonProperty("clusterName") String clusterName, @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig ) @@ -54,6 +57,7 @@ public KafkaEmitterConfig( this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); + this.requestTopic = requestTopic; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; } @@ -82,6 +86,12 @@ public String getClusterName() return clusterName; } + @Nullable + public String getRequestTopic() + { + return requestTopic; + } + @JsonProperty public Map getKafkaProducerConfig() { @@ -109,6 +119,11 @@ public boolean equals(Object o) if (!getAlertTopic().equals(that.getAlertTopic())) { return false; } + + if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } @@ -121,6 +136,7 @@ public int hashCode() int result = getBootstrapServers().hashCode(); result = 31 * result + getMetricTopic().hashCode(); result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); return result; @@ -133,6 +149,7 @@ public String toString() "bootstrap.servers='" + bootstrapServers + '\'' + ", metric.topic='" + metricTopic + '\'' + ", alert.topic='" + alertTopic + '\'' + + ", request.topic='" + requestTopic + '\'' + ", clusterName='" + clusterName + '\'' + ", Producer.config=" + kafkaProducerConfig + '}'; diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 89e75fc28089..23f19c483267 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -44,7 +44,7 @@ public void testSerDeserKafkaEmitterConfig() throws IOException { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", "alertTest", "clusterNameTest", - ImmutableMap.builder() + null, ImmutableMap.builder() .put("testKey", "testValue").build() ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); @@ -58,7 +58,7 @@ public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", "alertTest", "clusterNameTest", - null + null, null ); try { @SuppressWarnings("unused") diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java index 2224e62ddb11..99424294a13b 100644 --- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java +++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java @@ -28,6 +28,7 @@ import org.apache.druid.server.RequestLogLine; import org.joda.time.DateTime; +import java.util.HashMap; import java.util.Map; /** @@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent @Override public Map toMap() { - return ImmutableMap.of(); + final Map map = new HashMap<>(); + map.put("feed", getFeed()); + map.put("timestamp", getCreatedTime()); + map.put("service", getService()); + map.put("host", getHost()); + if (getQuery() != null) { + map.put("query", getQuery()); + } + if (getSql() != null) { + map.put("sql", getSql()); + } + map.put("sqlQueryContext", getSqlQueryContext()); + map.put("remoteAddr", getRemoteAddr()); + map.put("queryStats", getQueryStats()); + return map; } @Override diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java index fe7899903ac1..ab57d7c339b1 100644 --- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java +++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java @@ -26,15 +26,20 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + public class DefaultRequestLogEventTest { private ObjectMapper objectMapper = new DefaultObjectMapper(); @@ -68,4 +73,80 @@ public void testDefaultRequestLogEventSerde() throws Exception String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\",\"service\":\"druid-service\",\"sql\":null,\"sqlQueryContext\":{},\"remoteAddr\":\"127.0.0.1\",\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson)); } + + @Test + public void testDefaultRequestLogEventToMap() throws Exception + { + final String feed = "test"; + final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); + final String service = "druid-service"; + final String host = "127.0.0.1"; + final Query query = new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value")); + final QueryStats queryStats = new QueryStats(ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll")); + RequestLogLine nativeLine = RequestLogLine.forNative( + query, + timestamp, + host, + queryStats + ); + + DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent( + ImmutableMap.of("service", service, "host", host), feed, nativeLine + ); + final Map expected = new HashMap<>(); + expected.put("feed", feed); + expected.put("timestamp", timestamp); + expected.put("service", service); + expected.put("host", host); + expected.put("query", query); + expected.put("sqlQueryContext", ImmutableMap.of()); + expected.put("remoteAddr", host); + expected.put("queryStats", queryStats); + + Assert.assertEquals(expected, defaultRequestLogEvent.toMap()); + } + + @Test + public void testDefaultRequestLogEventToMapSQL() throws Exception + { + final String feed = "test"; + final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); + final String service = "druid-service"; + final String host = "127.0.0.1"; + final String sql = "select * from 1337"; + final QueryStats queryStats = new QueryStats(ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll")); + + RequestLogLine nativeLine = RequestLogLine.forSql( + sql, + ImmutableMap.of(), + timestamp, + host, + queryStats + ); + + DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent( + ImmutableMap.of("service", service, "host", host), feed, nativeLine + ); + final Map expected = new HashMap<>(); + expected.put("feed", feed); + expected.put("timestamp", timestamp); + expected.put("service", service); + expected.put("host", host); + expected.put("sql", sql); + expected.put("sqlQueryContext", ImmutableMap.of()); + expected.put("remoteAddr", host); + expected.put("queryStats", queryStats); + + Assert.assertEquals(expected, defaultRequestLogEvent.toMap()); + } } From ede0e8f24aff74890851ab6185166599c568f7f6 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sat, 27 Mar 2021 12:10:36 +0530 Subject: [PATCH 2/8] travis fixes --- docs/development/extensions-contrib/kafka-emitter.md | 2 +- .../druid/server/log/DefaultRequestLogEventTest.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index cd3ac9f3a021..255e83ebc09a 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -41,7 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| |`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| |`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| -|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to kafka topic.|no|none| +|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none| |`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| |`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java index ab57d7c339b1..598940888a8b 100644 --- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java +++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java @@ -75,7 +75,7 @@ public void testDefaultRequestLogEventSerde() throws Exception } @Test - public void testDefaultRequestLogEventToMap() throws Exception + public void testDefaultRequestLogEventToMap() { final String feed = "test"; final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); @@ -92,7 +92,8 @@ public void testDefaultRequestLogEventToMap() throws Exception ImmutableList.of(), 5, ImmutableMap.of("key", "value")); - final QueryStats queryStats = new QueryStats(ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll")); + final QueryStats queryStats = new QueryStats( + ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll")); RequestLogLine nativeLine = RequestLogLine.forNative( query, timestamp, @@ -117,14 +118,15 @@ public void testDefaultRequestLogEventToMap() throws Exception } @Test - public void testDefaultRequestLogEventToMapSQL() throws Exception + public void testDefaultRequestLogEventToMapSQL() { final String feed = "test"; final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); final String service = "druid-service"; final String host = "127.0.0.1"; final String sql = "select * from 1337"; - final QueryStats queryStats = new QueryStats(ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll")); + final QueryStats queryStats = new QueryStats( + ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll")); RequestLogLine nativeLine = RequestLogLine.forSql( sql, From f44a2ffda43785cb3d212f6d494cda93a0538f83 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Mon, 29 Mar 2021 11:21:00 +0530 Subject: [PATCH 3/8] review comments --- .../java/org/apache/druid/emitter/kafka/KafkaEmitter.java | 6 ++++-- .../org/apache/druid/server/log/DefaultRequestLogEvent.java | 2 +- .../apache/druid/server/log/DefaultRequestLogEventTest.java | 1 - 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 11584f8036da..fc03ba88596f 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -120,8 +120,10 @@ public void start() scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS); } scheduler.scheduleWithFixedDelay(() -> { - log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", - metricLost.get(), alertLost.get(), invalidLost.get()); + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%s], invalidLost=[%d]", + metricLost.get(), alertLost.get(), config.getRequestTopic() != null ? requestLost.get() : "N/A", + invalidLost.get() + ); }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); } diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java index 99424294a13b..4744b2f6d09f 100644 --- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java +++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java @@ -67,8 +67,8 @@ public Map toMap() } if (getSql() != null) { map.put("sql", getSql()); + map.put("sqlQueryContext", getSqlQueryContext()); } - map.put("sqlQueryContext", getSqlQueryContext()); map.put("remoteAddr", getRemoteAddr()); map.put("queryStats", getQueryStats()); return map; diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java index 598940888a8b..0a515ee78a69 100644 --- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java +++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java @@ -110,7 +110,6 @@ public void testDefaultRequestLogEventToMap() expected.put("service", service); expected.put("host", host); expected.put("query", query); - expected.put("sqlQueryContext", ImmutableMap.of()); expected.put("remoteAddr", host); expected.put("queryStats", queryStats); From 9573062c3910351376a7c1140bab98f4f1e012e9 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 30 Mar 2021 20:24:56 +0530 Subject: [PATCH 4/8] kafka emitter unit test --- .../druid/emitter/kafka/KafkaEmitter.java | 27 +++- .../druid/emitter/kafka/KafkaEmitterTest.java | 135 ++++++++++++++++++ 2 files changed, 160 insertions(+), 2 deletions(-) create mode 100644 extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index fc03ba88596f..97993ba68c6c 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; import org.apache.druid.java.util.common.StringUtils; @@ -91,7 +92,8 @@ private Callback setProducerCallback(AtomicLong lostCouter) }; } - private Producer setKafkaProducer() + @VisibleForTesting + protected Producer setKafkaProducer() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { @@ -143,7 +145,8 @@ private void sendRequestToKafka() sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost)); } - private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) + @VisibleForTesting + protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) { ObjectContainer objectToSend; try { @@ -208,4 +211,24 @@ public void close() scheduler.shutdownNow(); producer.close(); } + + public long getMetricLostCount() + { + return metricLost.get(); + } + + public long getAlertLostCount() + { + return alertLost.get(); + } + + public long getRequestLostCount() + { + return requestLost.get(); + } + + public long getInvalidLostCount() + { + return invalidLost.get(); + } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java new file mode 100644 index 000000000000..da8051f64162 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.QueryStats; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; +import org.apache.druid.server.log.RequestLogEvent; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +@RunWith(Parameterized.class) +public class KafkaEmitterTest +{ + @Parameterized.Parameter + public String requestTopic; + + @Parameterized.Parameters(name = "{index}: requestTopic - {0}") + public static Object[] data() + { + return new Object[] { "requests", null }; + } + + // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds + @Test(timeout = 15_000) + public void testKafkaEmitter() throws InterruptedException + { + final List serviceMetricEvents = ImmutableList.of( + ServiceMetricEvent.builder().build("m1", 1).build("service", "host") + ); + + final List alertEvents = ImmutableList.of( + new AlertEvent("service", "host", "description") + ); + + final List requestLogEvents = ImmutableList.of( + DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests", + RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of())) + ).build("service", "host") + ); + + int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size(); + int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size(); + + final CountDownLatch countDownSentEvents = new CountDownLatch( + requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); + final KafkaProducer producer = EasyMock.createStrictMock(KafkaProducer.class); + final KafkaEmitter kafkaEmitter = new KafkaEmitter( + new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), + new ObjectMapper() + ) + { + @Override + protected Producer setKafkaProducer() + { + return producer; + } + + @Override + protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, + Callback callback + ) + { + countDownSentEvents.countDown(); + super.sendToKafka(topic, recordQueue, callback); + } + }; + + EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null) + .times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); + EasyMock.replay(producer); + kafkaEmitter.start(); + + for (Event event : serviceMetricEvents) { + kafkaEmitter.emit(event); + } + for (Event event : alertEvents) { + kafkaEmitter.emit(event); + } + for (Event event : requestLogEvents) { + kafkaEmitter.emit(event); + } + countDownSentEvents.await(); + + Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); + Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); + Assert.assertEquals(0, kafkaEmitter.getRequestLostCount()); + Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getInvalidLostCount()); + + while (true) { + try { + EasyMock.verify(producer); + break; + } catch (Throwable e) { + // although the latch may have count down, producer.send may not have been called yet in KafkaEmitter + // so wait for sometime before verifying the mock + Thread.sleep(100); + // just continue + } + } + } +} \ No newline at end of file From a0734beced1372fb815ba7a9ae25a103baf1da8a Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 30 Mar 2021 20:26:52 +0530 Subject: [PATCH 5/8] new line --- .../java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index da8051f64162..7494a17b6ee1 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -132,4 +132,4 @@ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue Date: Wed, 31 Mar 2021 11:59:01 +0530 Subject: [PATCH 6/8] travis checks --- .../emitter/kafka/KafkaEmitterConfigTest.java | 29 ++++++++++++++----- .../druid/emitter/kafka/KafkaEmitterTest.java | 8 +++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 23f19c483267..8d653146ff63 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -43,13 +43,27 @@ public void setUp() public void testSerDeserKafkaEmitterConfig() throws IOException { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", "clusterNameTest", - null, ImmutableMap.builder() - .put("testKey", "testValue").build() + "alertTest", "requestTest", + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build() ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + + @Test + public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", null, + "clusterNameTest", ImmutableMap.builder() + .put("testKey", "testValue").build() + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @@ -57,14 +71,13 @@ public void testSerDeserKafkaEmitterConfig() throws IOException public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", - "alertTest", "clusterNameTest", - null, null + "alertTest", null, + "clusterNameTest", null ); try { @SuppressWarnings("unused") KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); - } - catch (NullPointerException e) { + } catch (NullPointerException e) { Assert.fail(); } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 7494a17b6ee1..2795bd8ccc1b 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -51,7 +51,10 @@ public class KafkaEmitterTest @Parameterized.Parameters(name = "{index}: requestTopic - {0}") public static Object[] data() { - return new Object[] { "requests", null }; + return new Object[] { + "requests", + null + }; } // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds @@ -124,7 +127,8 @@ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue Date: Wed, 31 Mar 2021 14:01:22 +0530 Subject: [PATCH 7/8] checkstyle fix --- .../org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 8d653146ff63..55ecdbaeb8a9 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -77,7 +77,8 @@ public void testSerDeNotRequiredKafkaProducerConfig() try { @SuppressWarnings("unused") KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); - } catch (NullPointerException e) { + } + catch (NullPointerException e) { Assert.fail(); } } From 707b3877eba22158b69bd0593775b8c6e18fad78 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 1 Apr 2021 04:36:45 +0530 Subject: [PATCH 8/8] count request lost when request topic is null --- .../org/apache/druid/emitter/kafka/KafkaEmitter.java | 9 ++++----- .../org/apache/druid/emitter/kafka/KafkaEmitterTest.java | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 97993ba68c6c..09b8031e093d 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -122,9 +122,8 @@ public void start() scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS); } scheduler.scheduleWithFixedDelay(() -> { - log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%s], invalidLost=[%d]", - metricLost.get(), alertLost.get(), config.getRequestTopic() != null ? requestLost.get() : "N/A", - invalidLost.get() + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", + metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get() ); }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); @@ -184,8 +183,8 @@ public void emit(final Event event) if (!alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } - } else if (event instanceof RequestLogEvent && config.getRequestTopic() != null) { - if (!requestQueue.offer(objectContainer)) { + } else if (event instanceof RequestLogEvent) { + if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) { requestLost.incrementAndGet(); } } else { diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 2795bd8ccc1b..26d9701cf702 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -120,8 +120,8 @@ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue