Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |
| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to specify a map of extra string dimensions for the events emitted. These can help make groups in your monitoring environment. | no | none |

### Example

Expand All @@ -57,5 +58,6 @@ druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
druid.emitter.kafka.extra.dimensions={"region":"us-east-1","environment":"preProd"}
```

Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ public void emit(final Event event)
if (event != null) {
try {
EventMap map = event.toMap();
if (config.getClusterName() != null) {
map = map.asBuilder()
.put("clusterName", config.getClusterName())
.build();
}
map = addExtraDimensionsToEvent(map);

String resultJson = jsonMapper.writeValueAsString(map);

Expand Down Expand Up @@ -239,6 +235,21 @@ public void emit(final Event event)
}
}

private EventMap addExtraDimensionsToEvent(EventMap map)
{
if (config.getClusterName() != null || config.getExtraDimensions() != null) {
EventMap.Builder eventMapBuilder = map.asBuilder();
if (config.getClusterName() != null) {
eventMapBuilder.put("clusterName", config.getClusterName());
}
if (config.getExtraDimensions() != null) {
eventMapBuilder.putAll(config.getExtraDimensions());
}
map = eventMapBuilder.build();
}
return map;
}

@Override
public void flush()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static EventType fromString(String name)
public static final Set<EventType> DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
@Nullable @JsonProperty("event.types")
@Nonnull @JsonProperty("event.types")
Comment thread
TSFenwick marked this conversation as resolved.
private final Set<EventType> eventTypes;
@Nullable @JsonProperty("metric.topic")
private final String metricTopic;
Expand All @@ -72,8 +72,10 @@ public static EventType fromString(String name)
private final String requestTopic;
@Nullable @JsonProperty("segmentMetadata.topic")
private final String segmentMetadataTopic;
@JsonProperty
@Nullable @JsonProperty
private final String clusterName;
@Nullable @JsonProperty("extra.dimensions")
private final Map<String, String> extraDimensions;
@JsonProperty("producer.config")
private final Map<String, String> kafkaProducerConfig;
@JsonProperty("producer.hiddenProperties")
Expand All @@ -87,7 +89,8 @@ public KafkaEmitterConfig(
@Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
@Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
@JsonProperty("clusterName") String clusterName,
@Nullable @JsonProperty("clusterName") String clusterName,
@Nullable @JsonProperty("extra.dimensions") Map<String, String> extraDimensions,
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig,
@JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider<String> kafkaProducerSecrets
)
Expand All @@ -99,10 +102,12 @@ public KafkaEmitterConfig(
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
this.clusterName = clusterName;
this.extraDimensions = extraDimensions;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
}

@Nonnull
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
{
// Unless explicitly overridden, kafka emitter will always emit metrics and alerts
Expand Down Expand Up @@ -143,12 +148,18 @@ public String getAlertTopic()
return alertTopic;
}

@JsonProperty
@Nullable @JsonProperty
public String getClusterName()
{
return clusterName;
}

@Nullable
public Map<String, String> getExtraDimensions()
{
return extraDimensions;
}

@Nullable
public String getRequestTopic()
{
Expand Down Expand Up @@ -228,6 +239,7 @@ public int hashCode()
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + (getExtraDimensions() != null ? getExtraDimensions().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
return result;
Expand All @@ -244,6 +256,7 @@ public String toString()
", request.topic='" + requestTopic + '\'' +
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", extra.dimensions='" + extraDimensions + '\'' +
", producer.config=" + kafkaProducerConfig + '\'' +
", producer.hiddenProperties=" + kafkaProducerSecrets +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void testSerDeserKafkaEmitterConfig() throws IOException
"requestTest",
"metadataTest",
"clusterNameTest",
ImmutableMap.of("env", "preProd"),
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
Expand All @@ -79,6 +80,7 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
null,
"metadataTest",
"clusterNameTest",
null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
Expand All @@ -102,6 +104,7 @@ public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException
null,
"metadataTest",
"clusterNameTest",
null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
Expand All @@ -117,7 +120,7 @@ public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
"alertTest", null, "metadataTest",
"clusterNameTest", null, null
"clusterNameTest", null, null, null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For code coverage, maybe make this non-null or add a new test with a non-null extra dimension map?

);
try {
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testKafkaEmitter() throws InterruptedException
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null),
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId", "cluster-101"), null, null),
mapper
)
{
Expand Down