diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md
index 3457c249c718..40b63ca73afd 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -36,20 +36,26 @@ to monitor the status of your Druid cluster with this extension.
All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.
-|property|description|required?|default|
-|--------|-----------|---------|-------|
-|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
-|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
-|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
-|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
-|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
-|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
+| Property | Description | Required | Default |
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
+| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
+| `druid.emitter.kafka.event.types` | Comma-separated event types.
Supported types are `alerts`, `metrics`, `requests`, and `segment_metadata`. | no | `["metrics", "alerts"]` |
+| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metrics. If `event.types` contains `metrics`, this field cannot be empty. | no | none |
+| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none |
+| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none |
+| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
+| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
+| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |
### Example
```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
+druid.emitter.kafka.event.types=["metrics", alerts", "requests", "segment_metadata"]
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
+druid.emitter.kafka.request.topic=druid-request-logs
+druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
+
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
index 905b6cffc013..11dea07585db 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
@@ -26,6 +26,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -137,6 +138,8 @@ public void emit(Event event)
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
diff --git a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
index 5baa1b5da245..e22c373f89fd 100644
--- a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
+++ b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
@@ -33,6 +33,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.LinkedHashMap;
@@ -127,6 +128,8 @@ public void emit(Event event)
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
diff --git a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
index b3739ab9d15f..10bfe1e869fc 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
@@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
@@ -139,6 +140,8 @@ public void emit(Event event)
"The following alert is dropped, description is [%s], severity is [%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 129a374b5849..dd8f3665f537 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -30,6 +31,7 @@
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
@@ -40,6 +42,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -55,6 +58,7 @@ public class KafkaEmitter implements Emitter
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
+ private final AtomicLong segmentMetadataLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
@@ -63,6 +67,7 @@ public class KafkaEmitter implements Emitter
private final MemoryBoundLinkedBlockingQueue metricQueue;
private final MemoryBoundLinkedBlockingQueue alertQueue;
private final MemoryBoundLinkedBlockingQueue requestQueue;
+ private final MemoryBoundLinkedBlockingQueue segmentMetadataQueue;
private final ScheduledExecutorService scheduler;
protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
@@ -78,10 +83,12 @@ public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+ this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
+ this.segmentMetadataLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
@@ -119,17 +126,25 @@ protected Producer setKafkaProducer()
@Override
public void start()
{
- scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
- scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
- if (config.getRequestTopic() != null) {
+ Set eventTypes = config.getEventTypes();
+ if (eventTypes.contains(EventType.METRICS)) {
+ scheduler.schedule(this::sendMetricToKafka, sendInterval, TimeUnit.SECONDS);
+ }
+ if (eventTypes.contains(EventType.ALERTS)) {
+ scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
+ }
+ if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS);
}
+ if (eventTypes.contains(EventType.SEGMENT_METADATA)) {
+ scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS);
+ }
scheduler.scheduleWithFixedDelay(() -> {
- log.info(
- "Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
+ log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
+ segmentMetadataLost.get(),
invalidLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES);
@@ -151,6 +166,11 @@ private void sendRequestToKafka()
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}
+ private void sendSegmentMetadataToKafka()
+ {
+ sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, setProducerCallback(segmentMetadataLost));
+ }
+
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback)
{
ObjectContainer objectToSend;
@@ -183,24 +203,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) || !alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENT_METADATA) || !segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
invalidLost.incrementAndGet();
+ log.warn(e, "Exception while serializing event");
}
}
}
@@ -238,4 +265,9 @@ public long getInvalidLostCount()
{
return invalidLost.get();
}
+
+ public long getSegmentMetadataLostCount()
+ {
+ return segmentMetadataLost.get();
+ }
}
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index ed7b9ea0e9d1..019edd095ea4 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -21,53 +21,108 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class KafkaEmitterConfig
{
+ public enum EventType
+ {
+ METRICS,
+ ALERTS,
+ REQUESTS,
+ SEGMENT_METADATA;
+
+ @JsonValue
+ @Override
+ public String toString()
+ {
+ return StringUtils.toLowerCase(this.name());
+ }
+ @JsonCreator
+ public static EventType fromString(String name)
+ {
+ return valueOf(StringUtils.toUpperCase(name));
+ }
+ }
+
+ public static final Set DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
- @JsonProperty("metric.topic")
+ @Nullable @JsonProperty("event.types")
+ private final Set eventTypes;
+ @Nullable @JsonProperty("metric.topic")
private final String metricTopic;
- @JsonProperty("alert.topic")
+ @Nullable @JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
+ @Nullable @JsonProperty("segmentMetadata.topic")
+ private final String segmentMetadataTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
- private Map kafkaProducerConfig;
+ private final Map kafkaProducerConfig;
@JsonCreator
public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
- @JsonProperty("metric.topic") String metricTopic,
- @JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("event.types") Set eventTypes,
+ @Nullable @JsonProperty("metric.topic") String metricTopic,
+ @Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
+ @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map kafkaProducerConfig
)
{
- this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
- this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
- this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
- this.requestTopic = requestTopic;
+ this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null");
+ this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
+ this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can not be null") : null;
+ this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not be null") : null;
+ this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
+ this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}
+ private Set maybeUpdateEventTypes(Set eventTypes, String requestTopic)
+ {
+ // Unless explicitly overridden, kafka emitter will always emit metrics and alerts
+ if (eventTypes == null) {
+ Set defaultEventTypes = new HashSet<>(DEFAULT_EVENT_TYPES);
+ // To maintain backwards compatibility, if eventTypes is not set, then requests are sent out or not
+ // based on the `request.topic` config
+ if (requestTopic != null) {
+ defaultEventTypes.add(EventType.REQUESTS);
+ }
+ return defaultEventTypes;
+ }
+ return eventTypes;
+ }
+
@JsonProperty
public String getBootstrapServers()
{
return bootstrapServers;
}
+ @JsonProperty
+ public Set getEventTypes()
+ {
+ return eventTypes;
+ }
+
@JsonProperty
public String getMetricTopic()
{
@@ -92,6 +147,12 @@ public String getRequestTopic()
return requestTopic;
}
+ @Nullable
+ public String getSegmentMetadataTopic()
+ {
+ return segmentMetadataTopic;
+ }
+
@JsonProperty
public Map getKafkaProducerConfig()
{
@@ -113,10 +174,16 @@ public boolean equals(Object o)
if (!getBootstrapServers().equals(that.getBootstrapServers())) {
return false;
}
- if (!getMetricTopic().equals(that.getMetricTopic())) {
+
+ if (getEventTypes() != null ? !getEventTypes().equals(that.getEventTypes()) : that.getEventTypes() != null) {
+ return false;
+ }
+
+ if (getMetricTopic() != null ? !getMetricTopic().equals(that.getMetricTopic()) : that.getMetricTopic() != null) {
return false;
}
- if (!getAlertTopic().equals(that.getAlertTopic())) {
+
+ if (getAlertTopic() != null ? !getAlertTopic().equals(that.getAlertTopic()) : that.getAlertTopic() != null) {
return false;
}
@@ -124,6 +191,10 @@ public boolean equals(Object o)
return false;
}
+ if (getSegmentMetadataTopic() != null ? !getSegmentMetadataTopic().equals(that.getSegmentMetadataTopic()) : that.getSegmentMetadataTopic() != null) {
+ return false;
+ }
+
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
@@ -134,9 +205,11 @@ public boolean equals(Object o)
public int hashCode()
{
int result = getBootstrapServers().hashCode();
- result = 31 * result + getMetricTopic().hashCode();
- result = 31 * result + getAlertTopic().hashCode();
+ result = 31 * result + (getEventTypes() != null ? getEventTypes().hashCode() : 0);
+ result = 31 * result + (getMetricTopic() != null ? getMetricTopic().hashCode() : 0);
+ result = 31 * result + (getAlertTopic() != null ? getAlertTopic().hashCode() : 0);
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
+ result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
@@ -147,9 +220,11 @@ public String toString()
{
return "KafkaEmitterConfig{" +
"bootstrap.servers='" + bootstrapServers + '\'' +
+ ", event.types='" + eventTypes + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
+ ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 55ecdbaeb8a9..c4d5811bcb53 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -19,15 +19,18 @@
package org.apache.druid.emitter.kafka;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
public class KafkaEmitterConfigTest
{
@@ -42,8 +45,8 @@ public void setUp()
@Test
public void testSerDeserKafkaEmitterConfig() throws IOException
{
- KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
- "alertTest", "requestTest",
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest",
+ "alertTest", "requestTest", "metadataTest",
"clusterNameTest", ImmutableMap.builder()
.put("testKey", "testValue").build()
);
@@ -56,8 +59,24 @@ public void testSerDeserKafkaEmitterConfig() throws IOException
@Test
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
{
- KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
- "alertTest", null,
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest",
+ "alertTest", null, "metadataTest",
+ "clusterNameTest", ImmutableMap.builder()
+ .put("testKey", "testValue").build()
+ );
+ String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
+ KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
+ .readValue(kafkaEmitterConfigString);
+ Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+ }
+
+ @Test
+ public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException
+ {
+ Set eventTypeSet = new HashSet();
+ eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null,
+ null, null, "metadataTest",
"clusterNameTest", ImmutableMap.builder()
.put("testKey", "testValue").build()
);
@@ -70,8 +89,8 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
@Test
public void testSerDeNotRequiredKafkaProducerConfig()
{
- KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
- "alertTest", null,
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
+ "alertTest", null, "metadataTest",
"clusterNameTest", null
);
try {
@@ -83,6 +102,14 @@ public void testSerDeNotRequiredKafkaProducerConfig()
}
}
+ @Test
+ public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException
+ {
+ Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
+ Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
+ Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
+ }
+
@Test
public void testJacksonModules()
{
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 422d18a7f153..b40da9bd9e08 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
@@ -37,7 +38,10 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.mockito.ArgumentMatchers.any;
@@ -47,20 +51,23 @@
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
- @Parameterized.Parameter
+ @Parameterized.Parameter(0)
+ public Set eventsType;
+
+ @Parameterized.Parameter(1)
public String requestTopic;
- @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - {1}")
public static Object[] data()
{
- return new Object[] {
- "requests",
- null
+ return new Object[][] {
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"},
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null}
};
}
- // there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 5 seconds
- @Test(timeout = 5_000)
+ // there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 10 seconds
+ @Test(timeout = 10_000)
public void testKafkaEmitter() throws InterruptedException
{
final List serviceMetricEvents = ImmutableList.of(
@@ -77,14 +84,26 @@ public void testKafkaEmitter() throws InterruptedException
).build("service", "host")
);
- int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
+ final List segmentMetadataEvents = ImmutableList.of(
+ new SegmentMetadataEvent(
+ "dummy_datasource",
+ DateTimes.of("2001-01-01T00:00:00.000Z"),
+ DateTimes.of("2001-01-02T00:00:00.000Z"),
+ DateTimes.of("2001-01-03T00:00:00.000Z"),
+ "dummy_version",
+ true
+ )
+ );
+
+ int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size() + segmentMetadataEvents.size();
int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
final CountDownLatch countDownSentEvents = new CountDownLatch(
requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+
final KafkaProducer producer = mock(KafkaProducer.class);
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
- new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
+ new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null),
new ObjectMapper()
)
{
@@ -113,10 +132,14 @@ protected Producer setKafkaProducer()
for (Event event : requestLogEvents) {
kafkaEmitter.emit(event);
}
+ for (Event event : segmentMetadataEvents) {
+ kafkaEmitter.emit(event);
+ }
countDownSentEvents.await();
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 233739eb7b77..a0567dce04bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -33,10 +33,13 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -257,11 +260,29 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
+ // Emit the segment related metadata using the configured emitters.
+ // There is a possibility that some segments' metadata event might get missed if the
+ // server crashes after commiting segment but before emitting the event.
+ this.emitSegmentMetadata(segment, toolbox);
}
return retVal;
}
+ private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox)
+ {
+ SegmentMetadataEvent event = new SegmentMetadataEvent(
+ segment.getDataSource(),
+ DateTime.now(DateTimeZone.UTC),
+ segment.getInterval().getStart(),
+ segment.getInterval().getEnd(),
+ segment.getVersion(),
+ segment.getLastCompactionState() != null
+ );
+
+ toolbox.getEmitter().emit(event);
+ }
+
private void checkWithSegmentLock()
{
final Map> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
new file mode 100644
index 000000000000..bc3769b62361
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.joda.time.DateTime;
+
+/**
+ * The event that gets generated whenever a segment is committed
+ */
+public class SegmentMetadataEvent implements Event
+{
+ public static final String FEED = "feed";
+ public static final String DATASOURCE = "dataSource";
+ public static final String CREATED_TIME = "createdTime";
+ public static final String START_TIME = "startTime";
+ public static final String END_TIME = "endTime";
+ public static final String VERSION = "version";
+ public static final String IS_COMPACTED = "isCompacted";
+
+ /**
+ * Time at which the segment metadata event is created
+ */
+ private final DateTime createdTime;
+ /**
+ * Datasource for which the segment is committed
+ */
+ private final String dataSource;
+ /**
+ * Start interval of the committed segment
+ */
+ private final DateTime startTime;
+ /**
+ * End interval of the committed segment
+ */
+ private final DateTime endTime;
+ /**
+ * Version of the committed segment
+ */
+ private final String version;
+ /**
+ * Is the segment, a compacted segment or not
+ */
+ private final boolean isCompacted;
+
+ public SegmentMetadataEvent(
+ String dataSource,
+ DateTime createdTime,
+ DateTime startTime,
+ DateTime endTime,
+ String version,
+ boolean isCompacted
+ )
+ {
+ this.dataSource = dataSource;
+ this.createdTime = createdTime;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.version = version;
+ this.isCompacted = isCompacted;
+ }
+
+ @Override
+ public String getFeed()
+ {
+ return "segment_metadata";
+ }
+ @Override
+ @JsonValue
+ public EventMap toMap()
+ {
+
+ return EventMap.builder()
+ .put(FEED, getFeed())
+ .put(DATASOURCE, dataSource)
+ .put(CREATED_TIME, createdTime)
+ .put(START_TIME, startTime)
+ .put(END_TIME, endTime)
+ .put(VERSION, version)
+ .put(IS_COMPACTED, isCompacted)
+ .build();
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
new file mode 100644
index 000000000000..83a4fcba7dc5
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentMetadataEventTest
+{
+ @Test
+ public void testBasicEvent()
+ {
+ SegmentMetadataEvent event = new SegmentMetadataEvent(
+ "dummy_datasource",
+ DateTimes.of("2001-01-01T00:00:00.000Z"),
+ DateTimes.of("2001-01-02T00:00:00.000Z"),
+ DateTimes.of("2001-01-03T00:00:00.000Z"),
+ "dummy_version",
+ true
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.builder()
+ .put(SegmentMetadataEvent.FEED, "segment_metadata")
+ .put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource")
+ .put(SegmentMetadataEvent.CREATED_TIME, DateTimes.of("2001-01-01T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.START_TIME, DateTimes.of("2001-01-02T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.END_TIME, DateTimes.of("2001-01-03T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.VERSION, "dummy_version")
+ .put(SegmentMetadataEvent.IS_COMPACTED, true)
+ .build(),
+ event.toMap()
+ );
+ }
+}