Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions docs/development/extensions-core/kafka-supervisor-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ This topic contains configuration reference information for the Apache Kafka sup

|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are not supported.|yes|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single kafka topic.|yes|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: caps

Suggested change
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single kafka topic.|yes|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single Kafka topic.|yes|

|`topicPattern`|String|A regex pattern that can used to select multiple kafka topics to ingest data from. Either this or `topic` can be used in a spec. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|yes|
|`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
Expand All @@ -53,7 +54,6 @@ This topic contains configuration reference information for the Apache Kafka sup
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no (default == null)|
|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more details.|no (default == null)|
|`multiTopic`|Boolean|Set this to true if you want to ingest data from multiple Kafka topics using a single supervisor. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|no (default == false)|

## Task Autoscaler Properties

Expand Down Expand Up @@ -138,11 +138,20 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler an
}
```
## Ingesting from multiple topics
To ingest from multiple topics, you have to set `multiTopic` in the supervisor IO config to `true`. Multiple topics
can be passed as a regex pattern as the value for `topic` in the IO config. For example, to ingest data from clicks and
impressions, you will set `topic` to `clicks|impressions` in the IO config. If new topics are added to the cluster that
match the regex, druid will automatically start ingesting from those new topics. If you enable multi-topic
ingestion for a datasource, downgrading will cause the ingestion to fail for that datasource.

To ingest data from multiple topics, you have to set `topicPattern` in the supervisor IO config and not set `topic`.
Multiple topics can be passed as a regex pattern as the value for `topicPattern` in the IO config. For example, to
ingest data from clicks and impressions, you will set `topicPattern` to `clicks|impressions` in the IO config.
Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that
start with `metrics-`. If new topics are added to the cluster that match the regex, Druid will automatically start
ingesting from those new topics. If you enable multi-topic ingestion for a datasource, downgrading to a version
lesser than 28.0.0 will cause the ingestion for that datasource to fail.

When ingesting data from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When ingesting data from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the
When ingesting data from multiple topics, the partitions are assigned based on the hashcode of the topic name and the id of the

partition within that topic. The partition assignment might not be uniform across all the tasks. It's also assumed
that partitions across individual topics have similar load. It is recommended that you have a higher number of
partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to
ingest from both high and low load topic in the same supervisor.

## More on consumerProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public Set<KafkaTopicPartition> getPartitionIds(String stream)
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Topic [%s] is not found."
+ "Check that the topic exists in Kafka cluster", stream);
+ " Check that the topic exists in Kafka cluster", stream);
}
}
return allPartitions.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
Map<KafkaTopicPartition, Long> partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
Expand Down Expand Up @@ -204,8 +204,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
baseSequenceName,
null,
null,
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getTopic(), endPartitions),
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), endPartitions),
kafkaIoConfig.getConsumerProperties(),
kafkaIoConfig.getPollTimeout(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
Expand All @@ -48,11 +49,13 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final Map<String, Object> consumerProperties;
private final long pollTimeout;
private final KafkaConfigOverrides configOverrides;
private final boolean multiTopic;
private final String topic;
private final String topicPattern;

@JsonCreator
public KafkaSupervisorIOConfig(
@JsonProperty("topic") String topic,
@JsonProperty("topicPattern") String topicPattern,
@JsonProperty("inputFormat") InputFormat inputFormat,
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
Expand All @@ -69,12 +72,11 @@ public KafkaSupervisorIOConfig(
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("multiTopic") Boolean multiTopic,
@JsonProperty("stopTaskCount") Integer stopTaskCount
)
{
super(
Preconditions.checkNotNull(topic, "topic"),
checkTopicArguments(topic, topicPattern),
inputFormat,
replicas,
taskCount,
Expand All @@ -98,13 +100,26 @@ public KafkaSupervisorIOConfig(
);
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
this.multiTopic = multiTopic != null ? multiTopic : DEFAULT_IS_MULTI_TOPIC;
this.topic = topic;
this.topicPattern = topicPattern;
}

/**
* Only used in testing or serialization/deserialization
*/
@JsonProperty
public String getTopic()
{
return getStream();
return topic;
}

/**
* Only used in testing or serialization/deserialization
*/
@JsonProperty
public String getTopicPattern()
{
return topicPattern;
}

@JsonProperty
Expand All @@ -131,17 +146,17 @@ public KafkaConfigOverrides getConfigOverrides()
return configOverrides;
}

@JsonProperty
public boolean isMultiTopic()
{
return multiTopic;
return topicPattern != null;
}

@Override
public String toString()
{
return "KafkaSupervisorIOConfig{" +
"topic='" + getTopic() + '\'' +
"topicPattern='" + getTopicPattern() + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
Expand All @@ -157,7 +172,23 @@ public String toString()
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
", idleConfig=" + getIdleConfig() +
", stopTaskCount=" + getStopTaskCount() +
'}';
}

private static String checkTopicArguments(String topic, String topicPattern)
{
if (topic == null && topicPattern == null) {
throw InvalidInput.exception("Either topic or topicPattern must be specified");
}
if (topic != null && topicPattern != null) {
throw InvalidInput.exception(
"Only one of topic [%s] or topicPattern [%s] must be specified",
topic,
topicPattern
);
}
return topic != null ? topic : topicPattern;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Set<ResourceAction> getInputSourceResources()
@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getTopic() : null;
return getIoConfig() != null ? getIoConfig().getStream() : null;
}

@Override
Expand Down
Loading