diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md
index 15c975bab4d8..255e83ebc09a 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
+|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml
index 4540cba87005..28ae860b7d81 100644
--- a/extensions-contrib/kafka-emitter/pom.xml
+++ b/extensions-contrib/kafka-emitter/pom.xml
@@ -45,6 +45,12 @@
${project.parent.version}
provided
+
+ org.apache.druid
+ druid-server
+ ${project.parent.version}
+ provided
+
com.google.code.findbugs
jsr305
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index ceb21c3975c9..09b8031e093d 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
@@ -30,6 +31,7 @@
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -51,6 +53,7 @@ public class KafkaEmitter implements Emitter
private static final int DEFAULT_RETRIES = 3;
private final AtomicLong metricLost;
private final AtomicLong alertLost;
+ private final AtomicLong requestLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
@@ -58,6 +61,7 @@ public class KafkaEmitter implements Emitter
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue metricQueue;
private final MemoryBoundLinkedBlockingQueue alertQueue;
+ private final MemoryBoundLinkedBlockingQueue requestQueue;
private final ScheduledExecutorService scheduler;
public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
@@ -70,9 +74,11 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
- this.scheduler = Executors.newScheduledThreadPool(3);
+ this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+ this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
+ this.requestLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
@@ -86,7 +92,8 @@ private Callback setProducerCallback(AtomicLong lostCouter)
};
}
- private Producer setKafkaProducer()
+ @VisibleForTesting
+ protected Producer setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
@@ -111,9 +118,13 @@ public void start()
{
scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
+ if (config.getRequestTopic() != null) {
+ scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS);
+ }
scheduler.scheduleWithFixedDelay(() -> {
- log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
- metricLost.get(), alertLost.get(), invalidLost.get());
+ log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
+ metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get()
+ );
}, 5, 5, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
}
@@ -128,7 +139,13 @@ private void sendAlertToKafka()
sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
}
- private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback)
+ private void sendRequestToKafka()
+ {
+ sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
+ }
+
+ @VisibleForTesting
+ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback)
{
ObjectContainer objectToSend;
try {
@@ -166,6 +183,10 @@ public void emit(final Event event)
if (!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
+ } else if (event instanceof RequestLogEvent) {
+ if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
+ requestLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
@@ -189,4 +210,24 @@ public void close()
scheduler.shutdownNow();
producer.close();
}
+
+ public long getMetricLostCount()
+ {
+ return metricLost.get();
+ }
+
+ public long getAlertLostCount()
+ {
+ return alertLost.get();
+ }
+
+ public long getRequestLostCount()
+ {
+ return requestLost.get();
+ }
+
+ public long getInvalidLostCount()
+ {
+ return invalidLost.get();
+ }
}
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index fe71b21c4cd9..ed7b9ea0e9d1 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -37,6 +37,8 @@ public class KafkaEmitterConfig
private final String metricTopic;
@JsonProperty("alert.topic")
private final String alertTopic;
+ @Nullable @JsonProperty("request.topic")
+ private final String requestTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
@@ -47,6 +49,7 @@ public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
@JsonProperty("metric.topic") String metricTopic,
@JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("request.topic") String requestTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map kafkaProducerConfig
)
@@ -54,6 +57,7 @@ public KafkaEmitterConfig(
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
+ this.requestTopic = requestTopic;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}
@@ -82,6 +86,12 @@ public String getClusterName()
return clusterName;
}
+ @Nullable
+ public String getRequestTopic()
+ {
+ return requestTopic;
+ }
+
@JsonProperty
public Map getKafkaProducerConfig()
{
@@ -109,6 +119,11 @@ public boolean equals(Object o)
if (!getAlertTopic().equals(that.getAlertTopic())) {
return false;
}
+
+ if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) {
+ return false;
+ }
+
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
@@ -121,6 +136,7 @@ public int hashCode()
int result = getBootstrapServers().hashCode();
result = 31 * result + getMetricTopic().hashCode();
result = 31 * result + getAlertTopic().hashCode();
+ result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
@@ -133,6 +149,7 @@ public String toString()
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
+ ", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 89e75fc28089..55ecdbaeb8a9 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -43,13 +43,27 @@ public void setUp()
public void testSerDeserKafkaEmitterConfig() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
- "alertTest", "clusterNameTest",
- ImmutableMap.builder()
- .put("testKey", "testValue").build()
+ "alertTest", "requestTest",
+ "clusterNameTest", ImmutableMap.builder()
+ .put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
- .readValue(kafkaEmitterConfigString);
+ .readValue(kafkaEmitterConfigString);
+ Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+ }
+
+ @Test
+ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
+ {
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
+ "alertTest", null,
+ "clusterNameTest", ImmutableMap.builder()
+ .put("testKey", "testValue").build()
+ );
+ String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
+ KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
+ .readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}
@@ -57,8 +71,8 @@ public void testSerDeserKafkaEmitterConfig() throws IOException
public void testSerDeNotRequiredKafkaProducerConfig()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
- "alertTest", "clusterNameTest",
- null
+ "alertTest", null,
+ "clusterNameTest", null
);
try {
@SuppressWarnings("unused")
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
new file mode 100644
index 000000000000..26d9701cf702
--- /dev/null
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
+import org.apache.druid.server.log.RequestLogEvent;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@RunWith(Parameterized.class)
+public class KafkaEmitterTest
+{
+ @Parameterized.Parameter
+ public String requestTopic;
+
+ @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ public static Object[] data()
+ {
+ return new Object[] {
+ "requests",
+ null
+ };
+ }
+
+ // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds
+ @Test(timeout = 15_000)
+ public void testKafkaEmitter() throws InterruptedException
+ {
+ final List serviceMetricEvents = ImmutableList.of(
+ ServiceMetricEvent.builder().build("m1", 1).build("service", "host")
+ );
+
+ final List alertEvents = ImmutableList.of(
+ new AlertEvent("service", "host", "description")
+ );
+
+ final List requestLogEvents = ImmutableList.of(
+ DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
+ RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
+ ).build("service", "host")
+ );
+
+ int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
+ int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
+
+ final CountDownLatch countDownSentEvents = new CountDownLatch(
+ requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+ final KafkaProducer producer = EasyMock.createStrictMock(KafkaProducer.class);
+ final KafkaEmitter kafkaEmitter = new KafkaEmitter(
+ new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
+ new ObjectMapper()
+ )
+ {
+ @Override
+ protected Producer setKafkaProducer()
+ {
+ return producer;
+ }
+
+ @Override
+ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue,
+ Callback callback
+ )
+ {
+ countDownSentEvents.countDown();
+ super.sendToKafka(topic, recordQueue, callback);
+ }
+ };
+
+ EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null)
+ .times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+ EasyMock.replay(producer);
+ kafkaEmitter.start();
+
+ for (Event event : serviceMetricEvents) {
+ kafkaEmitter.emit(event);
+ }
+ for (Event event : alertEvents) {
+ kafkaEmitter.emit(event);
+ }
+ for (Event event : requestLogEvents) {
+ kafkaEmitter.emit(event);
+ }
+ countDownSentEvents.await();
+
+ Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+ Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
+
+ while (true) {
+ try {
+ EasyMock.verify(producer);
+ break;
+ }
+ catch (Throwable e) {
+ // although the latch may have count down, producer.send may not have been called yet in KafkaEmitter
+ // so wait for sometime before verifying the mock
+ Thread.sleep(100);
+ // just continue
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
index 2224e62ddb11..4744b2f6d09f 100644
--- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
+++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
@@ -28,6 +28,7 @@
import org.apache.druid.server.RequestLogLine;
import org.joda.time.DateTime;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent
@Override
public Map toMap()
{
- return ImmutableMap.of();
+ final Map map = new HashMap<>();
+ map.put("feed", getFeed());
+ map.put("timestamp", getCreatedTime());
+ map.put("service", getService());
+ map.put("host", getHost());
+ if (getQuery() != null) {
+ map.put("query", getQuery());
+ }
+ if (getSql() != null) {
+ map.put("sql", getSql());
+ map.put("sqlQueryContext", getSqlQueryContext());
+ }
+ map.put("remoteAddr", getRemoteAddr());
+ map.put("queryStats", getQueryStats());
+ return map;
}
@Override
diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
index fe7899903ac1..0a515ee78a69 100644
--- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
+++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
@@ -26,15 +26,20 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
public class DefaultRequestLogEventTest
{
private ObjectMapper objectMapper = new DefaultObjectMapper();
@@ -68,4 +73,81 @@ public void testDefaultRequestLogEventSerde() throws Exception
String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\",\"service\":\"druid-service\",\"sql\":null,\"sqlQueryContext\":{},\"remoteAddr\":\"127.0.0.1\",\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}";
Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson));
}
+
+ @Test
+ public void testDefaultRequestLogEventToMap()
+ {
+ final String feed = "test";
+ final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+ final String service = "druid-service";
+ final String host = "127.0.0.1";
+ final Query query = new TimeseriesQuery(
+ new TableDataSource("dummy"),
+ new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
+ true,
+ VirtualColumns.EMPTY,
+ null,
+ Granularities.ALL,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ 5,
+ ImmutableMap.of("key", "value"));
+ final QueryStats queryStats = new QueryStats(
+ ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll"));
+ RequestLogLine nativeLine = RequestLogLine.forNative(
+ query,
+ timestamp,
+ host,
+ queryStats
+ );
+
+ DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+ ImmutableMap.of("service", service, "host", host), feed, nativeLine
+ );
+ final Map expected = new HashMap<>();
+ expected.put("feed", feed);
+ expected.put("timestamp", timestamp);
+ expected.put("service", service);
+ expected.put("host", host);
+ expected.put("query", query);
+ expected.put("remoteAddr", host);
+ expected.put("queryStats", queryStats);
+
+ Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+ }
+
+ @Test
+ public void testDefaultRequestLogEventToMapSQL()
+ {
+ final String feed = "test";
+ final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+ final String service = "druid-service";
+ final String host = "127.0.0.1";
+ final String sql = "select * from 1337";
+ final QueryStats queryStats = new QueryStats(
+ ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll"));
+
+ RequestLogLine nativeLine = RequestLogLine.forSql(
+ sql,
+ ImmutableMap.of(),
+ timestamp,
+ host,
+ queryStats
+ );
+
+ DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+ ImmutableMap.of("service", service, "host", host), feed, nativeLine
+ );
+ final Map expected = new HashMap<>();
+ expected.put("feed", feed);
+ expected.put("timestamp", timestamp);
+ expected.put("service", service);
+ expected.put("host", host);
+ expected.put("sql", sql);
+ expected.put("sqlQueryContext", ImmutableMap.of());
+ expected.put("remoteAddr", host);
+ expected.put("queryStats", queryStats);
+
+ Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+ }
}