Currently Kafka indexing task creates a segment partition corresponding to a Kafka partition. However, in many cases Kafka partition does not directly indicate the amount of data that ends up in Druid segments and as a result many times the tasks end up creating a lot of small segments.
Therefore, this proposal is to decouple the segment partitioning from Kafka partitioning and thus allowing to tune Kafka partitions and Druid segment partitions independently. Thus, a Kafka indexing task will create a single segment per segment interval (still consuming from multiple Kafka partitions) and number of tasks running concurrently will decide the number of segment partitions.
Any thoughts ?
Edit:
After having discussion with @cheddar and @himanshug we came up with few ideas -
-
As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sets the new offset on all replicas; asks them to publish and shutdown and start new set of tasks for consuming further offsets. In this case it is a little hard to reason about the number of concurrently running tasks and task duration. Also, the taskDuration config will be ignored.
-
As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sends these partition offset tuples as a check point to all tasks. If any task fetches a record with offset for any partition which is beyond this check point then the task will create a new appenderator driver with a new appenderator and hands out this record to the new appenderator for consumption. For each record pulled from Kafka the task will check the offset and hands out the record to relevant appenderator based on the check points it has. Once the task has consumed past all the offsets for all partitions for an individual check point then it can close the relevant appenderator and will hand off the segments handled by this appenderator. When the taskDuration is reached for a task then it will publish (or wait for) hand off of segments for each check point in FIFO manner before shutdown.
Second approach looks better and I will try to implement it if it sounds good.
Currently Kafka indexing task creates a segment partition corresponding to a Kafka partition. However, in many cases Kafka partition does not directly indicate the amount of data that ends up in Druid segments and as a result many times the tasks end up creating a lot of small segments.
Therefore, this proposal is to decouple the segment partitioning from Kafka partitioning and thus allowing to tune Kafka partitions and Druid segment partitions independently. Thus, a Kafka indexing task will create a single segment per segment interval (still consuming from multiple Kafka partitions) and number of tasks running concurrently will decide the number of segment partitions.
Any thoughts ?
Edit:
After having discussion with @cheddar and @himanshug we came up with few ideas -
As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sets the new offset on all replicas; asks them to publish and shutdown and start new set of tasks for consuming further offsets. In this case it is a little hard to reason about the number of concurrently running tasks and task duration. Also, the
taskDurationconfig will be ignored.As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sends these partition offset tuples as a check point to all tasks. If any task fetches a record with offset for any partition which is beyond this check point then the task will create a new appenderator driver with a new appenderator and hands out this record to the new appenderator for consumption. For each record pulled from Kafka the task will check the offset and hands out the record to relevant appenderator based on the check points it has. Once the task has consumed past all the offsets for all partitions for an individual check point then it can close the relevant appenderator and will hand off the segments handled by this appenderator. When the taskDuration is reached for a task then it will publish (or wait for) hand off of segments for each check point in FIFO manner before shutdown.
Second approach looks better and I will try to implement it if it sounds good.