Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|

Expand Down
6 changes: 6 additions & 0 deletions extensions-contrib/kafka-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -30,6 +31,7 @@
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -51,13 +53,15 @@ public class KafkaEmitter implements Emitter
private static final int DEFAULT_RETRIES = 3;
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
private final AtomicLong invalidLost;

private final KafkaEmitterConfig config;
private final Producer<String, String> producer;
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final ScheduledExecutorService scheduler;

public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
Expand All @@ -70,9 +74,11 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(3);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}

Expand All @@ -86,7 +92,8 @@ private Callback setProducerCallback(AtomicLong lostCouter)
};
}

private Producer<String, String> setKafkaProducer()
@VisibleForTesting
protected Producer<String, String> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Expand All @@ -111,9 +118,13 @@ public void start()
{
scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
if (config.getRequestTopic() != null) {
scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
metricLost.get(), alertLost.get(), invalidLost.get());
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get()
);
}, 5, 5, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
}
Expand All @@ -128,7 +139,13 @@ private void sendAlertToKafka()
sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
}

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
private void sendRequestToKafka()
{
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}

@VisibleForTesting
protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
try {
Expand Down Expand Up @@ -166,6 +183,10 @@ public void emit(final Event event)
if (!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
} else {
invalidLost.incrementAndGet();
}
Expand All @@ -189,4 +210,24 @@ public void close()
scheduler.shutdownNow();
producer.close();
}

public long getMetricLostCount()
{
return metricLost.get();
}

public long getAlertLostCount()
{
return alertLost.get();
}

public long getRequestLostCount()
{
return requestLost.get();
}

public long getInvalidLostCount()
{
return invalidLost.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class KafkaEmitterConfig
private final String metricTopic;
@JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
Expand All @@ -47,13 +49,15 @@ public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
@JsonProperty("metric.topic") String metricTopic,
@JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
this.requestTopic = requestTopic;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}
Expand Down Expand Up @@ -82,6 +86,12 @@ public String getClusterName()
return clusterName;
}

@Nullable
public String getRequestTopic()
{
return requestTopic;
}

@JsonProperty
public Map<String, String> getKafkaProducerConfig()
{
Expand Down Expand Up @@ -109,6 +119,11 @@ public boolean equals(Object o)
if (!getAlertTopic().equals(that.getAlertTopic())) {
return false;
}

if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) {
return false;
}

if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
Expand All @@ -121,6 +136,7 @@ public int hashCode()
int result = getBootstrapServers().hashCode();
result = 31 * result + getMetricTopic().hashCode();
result = 31 * result + getAlertTopic().hashCode();
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
Expand All @@ -133,6 +149,7 @@ public String toString()
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,36 @@ public void setUp()
public void testSerDeserKafkaEmitterConfig() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", "clusterNameTest",
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
"alertTest", "requestTest",
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", null,
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeNotRequiredKafkaProducerConfig()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
"alertTest", "clusterNameTest",
null
"alertTest", null,
"clusterNameTest", null
);
try {
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.emitter.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.List;
import java.util.concurrent.CountDownLatch;

@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
@Parameterized.Parameter
public String requestTopic;

@Parameterized.Parameters(name = "{index}: requestTopic - {0}")
public static Object[] data()
{
return new Object[] {
"requests",
null
};
}

// there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds
@Test(timeout = 15_000)
public void testKafkaEmitter() throws InterruptedException
{
final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
ServiceMetricEvent.builder().build("m1", 1).build("service", "host")
);

final List<AlertEvent> alertEvents = ImmutableList.of(
new AlertEvent("service", "host", "description")
);

final List<RequestLogEvent> requestLogEvents = ImmutableList.of(
DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
).build("service", "host")
);

int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();

final CountDownLatch countDownSentEvents = new CountDownLatch(
requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
final KafkaProducer<String, String> producer = EasyMock.createStrictMock(KafkaProducer.class);
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
new ObjectMapper()
)
{
@Override
protected Producer<String, String> setKafkaProducer()
{
return producer;
}

@Override
protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue,
Callback callback
)
{
countDownSentEvents.countDown();
super.sendToKafka(topic, recordQueue, callback);
}
};

EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null)
.times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
EasyMock.replay(producer);
kafkaEmitter.start();

for (Event event : serviceMetricEvents) {
kafkaEmitter.emit(event);
}
for (Event event : alertEvents) {
kafkaEmitter.emit(event);
}
for (Event event : requestLogEvents) {
kafkaEmitter.emit(event);
}
countDownSentEvents.await();

Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());

while (true) {
try {
EasyMock.verify(producer);
break;
}
catch (Throwable e) {
// although the latch may have count down, producer.send may not have been called yet in KafkaEmitter
// so wait for sometime before verifying the mock
Thread.sleep(100);
// just continue
}
}
}
}
Loading