Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 14 additions & 37 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,40 +311,17 @@ In this way, configuration changes can be applied without requiring any pause in

### On the Subject of Segments

The Kafka indexing service may generate a significantly large number of segments which over time will cause query
performance issues if not properly managed. One important characteristic to understand is that the Kafka indexing task
will generate a Druid partition in each segment granularity interval for each partition in the Kafka topic. As an
example, if you are ingesting realtime data and your segment granularity is 15 minutes with 10 partitions in the Kafka
topic, you would generate a minimum of 40 segments an hour. This is a limitation imposed by the Kafka architecture which
guarantees delivery order within a partition but not across partitions. Therefore as a consumer of Kafka, in order to
generate segments deterministically (and be able to provide exactly-once ingestion semantics) partitions need to be
handled separately.

Compounding this, if your taskDuration was also set to 15 minutes, you would actually generate 80 segments an hour since
any given 15 minute interval would be handled by two tasks. For an example of this behavior, let's say we started the
supervisor at 9:05 with a 15 minute segment granularity. The first task would create a segment for 9:00-9:15 and a
segment for 9:15-9:30 before stopping at 9:20. A second task would be created at 9:20 which would create another segment
for 9:15-9:30 and a segment for 9:30-9:45 before stopping at 9:35. Hence, if taskDuration and segmentGranularity are the
same duration, you will get two tasks generating a segment for each segment granularity interval.

Understanding this behavior is the first step to managing the number of segments produced. Some recommendations for
keeping the number of segments low are:

* Keep the number of Kafka partitions to the minimum required to sustain the required throughput for your event streams.
* Increase segment granularity and task duration so that more events are written into the same segment. One
consideration here is that segments are only handed off to historical nodes after the task duration has elapsed.
Since workers tend to be configured with less query-serving resources than historical nodes, query performance may
suffer if tasks run excessively long without handing off segments.

In many production installations which have been ingesting events for a long period of time, these suggestions alone
will not be sufficient to keep the number of segments at an optimal level. It is recommended that scheduled re-indexing
tasks be run to merge segments together into new segments of an ideal size (in the range of ~500-700 MB per segment).
Currently, the recommended way of doing this is by running periodic Hadoop batch ingestion jobs and using a `dataSource`
inputSpec to read from the segments generated by the Kafka indexing tasks. Details on how to do this can be found under
['Updating Existing Data'](../../ingestion/update-existing-data.html). Note that the Merge Task and Append Task described
[here](../../ingestion/tasks.html) will not work as they require unsharded segments while Kafka indexing tasks always
generated sharded segments.

There is ongoing work to support automatic segment compaction of sharded segments as well as compaction not requiring
Hadoop (see [here](https://github.com/druid-io/druid/pull/1998) and [here](https://github.com/druid-io/druid/pull/3611)
for related PRs).
Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment limit is reached, at this point a new partition for this segment granularity is
created for further events. Kafka Indexing Task also does incremental hand-offs which means that all the segments created by a
task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held
by the task at that point in time will be handed-off and new set of segments will be created for further events.
This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager
nodes and it is encouraged to do so.

Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity
is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and
events for the interval 13:00 - 14:00 may be split across previous and new set of tasks. If you see it becoming a problem then
one can schedule re-indexing tasks be run to merge segments together into new segments of an ideal size (in the range of ~500-700 MB per segment).
There is also ongoing work to support automatic segment compaction of sharded segments as well as compaction not requiring
Hadoop (see [here](https://github.com/druid-io/druid/pull/5102)).