This proposal is to fix #6136.
Background
Druid's versioning system & segment timeline
Currently in Druid, a segment is tightly coupled with its interval. All timestamps stored in the segment fall into segment's interval. The segment version is associated with the segment interval. That is, all visible segments in the same interval have the same version.
With the concept of segments and their intervals, we can imagine that a set of segments composes a segment timeline. The segment timeline can be thought of as a timeChunk space. In this space, the segments are lined up by their interval. The segments of the same interval fall into the same timeChunk.
This can be represented by a sorted map of interval and its corresponding segments, which is implemented in VersionedIntervalTimeline. The VersionedIntervalTimeline is responsible for searching for the segments of the highest version in the given interval (lookup()). If there are two or more segments of different versions in the same interval, the segments of the lower version are overshadowed by the ones of the highest version and not visible to other components in Druid. This timeline is being used primarily by coordinators and brokers to update the timeline with new segments and to find queryable segments, respectively. We are also using the timeline in many places to figure out the latest segments from the given set of segments.
TimeChunk locking
In Druid, tasks should get a task lock before reading or writing segments. This task lock is currently for coordinating all accesses to a timeChunk. For example, if a task needs to read segments of a timeChunk of 2019-01-01/2019-01-02, it should get a lock for that timeChunk before reading segments. While this lock is valid, no other tasks can read segments in the same timeChunk or write new segments into it.
Once a task gets a lock, it remains valid until 1) the task is finished or 2) another task of a higher priority revokes the lock (see #4479 for more details).

This figure shows an example of time chunk locking. IndexTask and KafkaIndexTask got a lock for the timeChunk of 2018-01-01/2018-04-01 and 2018-08-01/2018-09-01, respectively.
Note that each task can generate one or more segments. If a task creates multiple segments using the same lock, the created segments would have the same version.
The version of new locks usually have the higher version than the old ones. For example, overwriting tasks always get a lock of the higher version than that of existing segments. However, sometimes, we need to use the same version across different locks. The well-known use case is the tasks of the appending mode like kafkaIndexTask. Since these tasks append new segments to timeChunks where some segments can already exist, the new segments shouldn't overshadow the existing ones. So, the appending tasks get the lock of the same version with the existing segments. This is implemented in TaskLockBox.
Problem & motivation
Since #6136 is describing well what the problem and the motivation are, I'm just summarizing the key problems and motivations here.
Late data problem
The kafkaIndexTask has the higher priority than the compactionTask by default. If some data arrives late, the kafkaSupervisor spawns new kafkaIndexTasks to handle the late data. If a compaction tasks was running for any overlapping timeChunk, the new kafkaIndexTask would revoke the locks of the running compactionTask. If this happens frequently, the compaction might never complete.
To avoid this problem, the compaction should be available while new data is being ingested into the same dataSource and the same timeChunk. From the perspective of optimization, this is also good because more recent data becomes available for compaction as soon as it gets ingested into Druid.
Requirements for compaction
- A segment should be available for compaction immediately once it is published.
- Overwriting tasks (including compactionTask) and appending tasks shouldn't interfere each other. More generally speaking, two tasks shouldn't block each other if they read/write from/to different segments.
- This implies the segments generated by appendingTasks should be always queryable even if overwriting tasks are running for the same timeChunk.
- Overwriting tasks can change the segment granularity. This means, a segment can overshadow another segment even if they have different timeChunks.
Public interfaces
Changes for DataSegment
DataSegment has an interval, a version, and a partitionId as it is.
- No two segments have the same
(interval, version, partitionId) as it is.
DataSegment has two new sets of partitionIds (each partitionId represents a segment).
- Overshadowed segments: the segments overshadowed by this segment. This represent the overshadowing relation between segments of the same version in the same time chunk.
- Atomic update group: the segments which should be atomically updated all together in the timeline. This will be further explained later in the below Timeline construction algorithm section.
The DataSegment class would be like below:
public class DataSegment
{
private final String dataSource;
private final Interval interval;
private final String version;
private final Map<String, Object> loadSpec;
private final List<String> dimensions;
private final List<String> metrics;
private final ShardSpec shardSpec;
private final long size;
private final String identifier;
// the below two sets can be improved by encoding consecutive partitionIds as a range
private final Set<Integer> overshadowingSegments;
private final Set<Integer> atomicUpdateGroup;
}
Proposed changes
I propose to add a new lock type, segment lock. Unlike timeChunk lock, segmentLock is to coordinate the access (reading/writing) to individual segments. With segmnet lock, each task should get a lock whenever it reads or writes a segment. If two or more tasks request a lock for the same segment at the same time, only one task will get a lock and others should wait for the task to release the lock. Since two or more tasks can read or write segments of the same timeChunk, there should be no lock contention between appending tasks (e.g., kafkaIndexTask) and overwriting tasks (e.g., compactionTask).

This figure shows an example of the segment locking. A compactionTask should get locks for all input segments before it starts. It should also get locks for output segments before writing segments. A kafkaIndexTask gets a new lock whenever it reads an event of a new timestamp which is not covered by its current locks. Since each task has separate locks for individual segments, there's no lock contention between kafkaIndexTask and compactionTask even though they are running for the same dataSource and the same timeChunk.
Please note that segment lock doesn't replace the existing timeChunk lock. Instead, both can be used in different cases. Details are described in the below 'Changes in indexing service' section.
The segment lock makes the current timeline implementation broken because we should be able to figure out what segments are newer than others even in the same timeChunk. As a result, we need a new mechanism to support this new requirement for segment locking.
The below sections describes details of the algorithms for segment locking and timeline management.
Lock granularity (segment lock vs timeChunk lock)
Indexing tasks now have two options for locking, i.e., segment lock and timeChunk lock.
Segment lock
All tasks should use segment lock only except when they are overwriting existing segments with the same segmentGranularity.
In segment locking, a task first gets locks for all input segments if exist. Whenever the task writes a new row having the timestamp which are not covered by the current lock for output segments, it gets a new lock for the new output segment. This lock allocation for new segments involve allocating new segmentIds as well. If the task is overwriting existing segments, the new segmentIds would have the same version with existing segments but have them in their overshadowedSegments. The detailed changes for segment allocation is described in the below 'Segment allocation' section.
The SegmentLock would be like:
public class SegmentLock implements TaskLock
{
private final TaskLockType type;
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
private final Integer priority;
private final boolean revoked;
private final IntSet partitionIds;
}
With this change, a giant batch task can get a lot of segment locks instead of a single timeChunk lock.
TimeChunk lock
The timeChunk lock is same with the current taskLock. Tasks get a lock for a timeChunk (e.g., 2019-01-01/2019-01-02) before reading segments from the timeChunk or writing new segments into the timeChunk.
The timeChunk lock is used only when overwriting tasks change the segment granularity. For example, if a compactionTask with segmentGranularity = DAY should use the timeChunk lock if the original segmentGranularity was HOUR.
When a task requests a timeChunk lock, the overlord allocates a new version for the requested lock which must be higher than the versions of any existing segments in the same timeChunk. All new segments created with this lock would have the new version. Since new segments with a timeChunk lock would always have a higher version than existing segments, they overshadow all existing segments.
The TimeChunkLock class would look like below:
public class TimeChunkLock implements TaskLock
{
private final TaskLockType type;
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
private final Integer priority;
private final boolean revoked;
}
Segment allocation
Segment allocation is allocating segmentIds for new segments. The segment allocation algorithm is different depending on the lock granularity and the rollup mode.
If the lock granularity = segmentLock, the segment allocation must be centralized in the overlord so that all segments of the same version have a different partitionId in the same timeChunk. If the lock granularity = timeChunkLock, each task can generate segments of any partitionIds by itself since new segments would have a higher version than existing segments.
If rollup mode = best-effort rollup, segments can be allocated dynamically whenever a new segment needs to be created. In the perfect rollup mode, all new segments should be allocated with a proper shardSpec at once before writing data into segments.
To sum up,
| Lock granularity |
Rollup mode |
Segment allocation algorithm |
| segment lock |
best-effort rollup |
Centralized incremental segment allocation. Whenever a task sends a lock request for a new segment, the overlord allocates a new segmentId and creates a lock for the new segment. The segmentId allocation and lock creation are performed atomically. |
| segment lock |
perfect rollup |
Centralized bulk segment allocation. The task sends lock requests for all new segments before it starts writing segments. |
| timeChunk lock |
best-effort rollup |
Local incremental segment allocation. The task allocates a new segmentId on its own whenever it needs. |
| timeChunk lock |
perfect rollup |
Local bulk segment allocation. The task allocates all new segmentIds on its own before it starts writing segments. |
Segment metadata creation
In memory, segment metadata is stored in DataSegment. The two new fields of DataSegment are overshadowedSegments and atomicUpdateGroup.
overshadowSegments is a set of partitionIds of the input segments. For example, given a task which overwrites segments of foo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_version and foo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_version_1, all its output segments in the timeChunk of 2019-01-01/2019-02-01 would have an overshadowedSegments of (0, 1). overshadowedSegments is filled only when the task overwrites some existing segments with the same segment granularity. Otherwise, it remains empty.
atomicUpdsteGroup is a set of partitionIds of segments which must become visible at the same time in the timeline. This is same with a set of partitionIds of segments which are published together per timeChunk. For example, given a task which generates new segments of foo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_version_2, foo_2019-02-01T00:00:00.000Z_2019-03-01T00:00:00.000Z_version_2, and foo_2019-02-01T00:00:00.000Z_2019-03-01T00:00:00.000Z_version_3, the atomicUpdateGroup of the first segment would be (2) while the second and the third segments would have (2, 3). atomicUpdateGroup is filled only when overshadowedSegments is not empty.
Changes in TaskLockBox
TaskLockBox should support both timeChunk locks and segment locks. It checks the followings to detect lock collisions.
- A single exclusive lock (either timeChunk lock or segment lock) can exist for a timeChunk.
- Two or more shared locks regardless of the lock granularity can exist for a timeChunk.
- Lock requests are idempotent only when they have the same
groupId, interval, and lockGranularity (timeChunk or segment).
Since segment allocation and lock creation should be performed atomically, the taskLockbox is also responsible for allocating new segmentIds. When a task sends a lock request, it explicitly specifies the partitionId for the new segment. The taskLockbox checks if the request is for a new segment and allocates it.
Note that the segment allocation in taskLockbox can happen only for segment locks.
Timeline management
The new timeline extends the capability of the existing timeline algorithm based on the versioned intervals; the timeline first compares the segment version and then their overshadowedSegments to find the overshadowing relation between them.
Since overshadowedSegments stores only one-hop overshadowing relation, we need to track the overshadowedSegments of multiple hops to figure out all overshadowing relations. This is traversing overshadowedSegments graph where each segment and the overshadowing relation between two segments are the node and the edge, respectively.

The above figure shows an example of overshadowedSegments graph. Let's imagine the below scenario.
- A batch task created the segments 1, 2, and 3.
- A compaction task merged segment 2 and 3 to segment 4. At the same time, a kafkaIndexTask created segment 5.
- A compaction task created segments 6 and 7 by mixing segments 4 and 5. At the same time, a kafkaIndexTask created segment 8.
The created segments consist of the overshadowing graph. After 3., only segments 1, 6, 7, and 8 are visible.
Coordinators and brokers should be able to update their timeline incrementally whenever a historical announces/unannounces a segment. Segments can have 3 different states in the timeline.
- standby: a segment is online but some segments in its atomicUpdateGroup are not online yet. Segments in this state are not queryable.
- visible: a segment is queryable.
- overshadowed: a segment is online but overshadowed by other segments. Segments in this state are not queryable.
When a new segment is added, its state can be either standby or overshadowed. Then, its state can be changed like standby -> visible <-> overshadowed.
Bulk construction from a set of segments
- Find all overshadowed segments by merging the overshadowedSegments of all input segments.
- For each remaining segment, check all segments in its atomicUpdateGroup exist in the input segment set but not in total overshadowedSegments. If they do, they are
visible. Otherwise, they are standby.
Incremental update for new segments
This algorithm is called whenever a new segment is added to a timeline. For example, a new segment is loaded by a historical, the segment is newly added to brokers' timeline.
- Compares the version of the new segment with other segments of overlapping intervals.
- If the new segment has a higher version than others, only the new segment is
visible.
- If the new segment has the same version and the same interval with existing segments, checks its overshadowedSegments and atomicUpdateGroup. If it has no overshadowedSegments, it can be
visible immediately. Otherwise, all segments in the overshadowedSegments are atomically swapped with the segments in the atomicUpdateGroup.
Incremental update for removed segments
This algorithm is called whenever an existing segment is removed from a timeline. For example, if a historical becomes offline, all its segments are removed from brokers' timeline.
Unlike adding new segments, removing segments should consider falling back to existing segments. Suppose two segments, A and B, and the segment A overshadows the segment B. When the segment A is removed but the segment B is still available, the broker should be able to route queries to the historical serving the segment B.
- Remove a segment from the timeline.
- If the removed segment was
visible, check its overshadowed segments.
- If all overshadowed segments are in either
overshadowed or standby states, all segments in atomicUpdateGroup of the removed segments are atomically swapped with its overshadowed segments.
- Otherwise, for each segment in the overshadowed segments of the removed segment, repeat the above step until any swappable atomicUpdateGroup is found.
- If any swappable atomicUpdateGroup is not found, changes nothing.
This only works if there is no missing node in the timeline. To guarantee this, we need to add a new assumption that the timeline user (brokers, coordinators) are aware of all active segments of which used is set to true in the metadata store.
The coordinator is already syncing active segments from the metastore periodically (SQLMetadataSegmentManager).
Brokers can do a similar sync to the coordinator. See #6834.
Compatibility, Deprecation, and Migration Plan
Since the both timeChunk and segment locks are supported, there's no compatibility issue for timeChunk lock requests from tasks of an old version.
Rejected Alternatives
There are two rejected alternatives.
The first proposal for background compaction
#4434
Storing all overshadowed segments in overshadowedSegments of DataSegment
Unlike in the proposal, we can store all partitionIds of the overshadowed segments in overshadowedSegments set. Since overshadowedSegments has all overshadowed segments in it, we can check one segment overshadows another by simply checking their overshadowingSegments contain each other. This would be simpler than traversing the graph of overshadowing relation.
However, since overshadowedSegments would always grow as new segments which overshadow existing segments are added, this wouldn't be a good design.
Representing the overshadowing relation as comparing numbers
In this alternative, a partition of a segment is represented by a range of fraction, i.e., startPartitionId and endPartitionId. For appending tasks, the partitions of new segments are increased by 1 like [0, 1), [1, 2), .... In overwriting tasks, the partitionId of each segment is (endPartitionId of last partition - startPartitionId of first partition) * i / (# of output partitions) where i is the index for new segment.
For example, suppose there are 3 segments of partitionIds [0, 1), [1, 2), and [2, 3). If an overwriting task reads these segments and generates 2 segments, the new segments would have partitionIds of [0, 3/2) and [3/2, 3).
Since a fraction can theoretically represent any number, any partition can be represented by a pair of fractions no matter how many times the partition has been split.
This idea was dropped because it's quite less intuitive than this proposal. Also, this assumes the partition ranges are always consecutive which can be not true.
Future work
The overshadowedSegments might become large if too many segments are compacted together and it might cause high memory consumption on brokers or the coordinator. To avoid this issue, we can employ a new task type, e.g., Clean Task, which clears overshadowedSegments by getting timeChunk locks. Note that this task won't perform reindexing. Instead, it will get timeChunk locks, download segments, push & publish those segments with updated metadata (mostly version, overshadowedSegments, and loadSpec). This clean tasks would be spawned by the coordinator periodically to clean up segment metadata similar to automatic compaction.
This proposal is to fix #6136.
Background
Druid's versioning system & segment timeline
Currently in Druid, a segment is tightly coupled with its interval. All timestamps stored in the segment fall into segment's interval. The segment version is associated with the segment interval. That is, all visible segments in the same interval have the same version.
With the concept of segments and their intervals, we can imagine that a set of segments composes a segment timeline. The segment timeline can be thought of as a timeChunk space. In this space, the segments are lined up by their interval. The segments of the same interval fall into the same timeChunk.
This can be represented by a sorted map of interval and its corresponding segments, which is implemented in
VersionedIntervalTimeline. TheVersionedIntervalTimelineis responsible for searching for the segments of the highest version in the given interval (lookup()). If there are two or more segments of different versions in the same interval, the segments of the lower version are overshadowed by the ones of the highest version and not visible to other components in Druid. This timeline is being used primarily by coordinators and brokers to update the timeline with new segments and to find queryable segments, respectively. We are also using the timeline in many places to figure out the latest segments from the given set of segments.TimeChunk locking
In Druid, tasks should get a task lock before reading or writing segments. This task lock is currently for coordinating all accesses to a timeChunk. For example, if a task needs to read segments of a timeChunk of
2019-01-01/2019-01-02, it should get a lock for that timeChunk before reading segments. While this lock is valid, no other tasks can read segments in the same timeChunk or write new segments into it.Once a task gets a lock, it remains valid until 1) the task is finished or 2) another task of a higher priority revokes the lock (see #4479 for more details).
This figure shows an example of time chunk locking.
IndexTaskandKafkaIndexTaskgot a lock for the timeChunk of2018-01-01/2018-04-01and2018-08-01/2018-09-01, respectively.Note that each task can generate one or more segments. If a task creates multiple segments using the same lock, the created segments would have the same version.
The version of new locks usually have the higher version than the old ones. For example, overwriting tasks always get a lock of the higher version than that of existing segments. However, sometimes, we need to use the same version across different locks. The well-known use case is the tasks of the appending mode like kafkaIndexTask. Since these tasks append new segments to timeChunks where some segments can already exist, the new segments shouldn't overshadow the existing ones. So, the appending tasks get the lock of the same version with the existing segments. This is implemented in
TaskLockBox.Problem & motivation
Since #6136 is describing well what the problem and the motivation are, I'm just summarizing the key problems and motivations here.
Late data problem
The kafkaIndexTask has the higher priority than the compactionTask by default. If some data arrives late, the kafkaSupervisor spawns new kafkaIndexTasks to handle the late data. If a compaction tasks was running for any overlapping timeChunk, the new kafkaIndexTask would revoke the locks of the running compactionTask. If this happens frequently, the compaction might never complete.
To avoid this problem, the compaction should be available while new data is being ingested into the same dataSource and the same timeChunk. From the perspective of optimization, this is also good because more recent data becomes available for compaction as soon as it gets ingested into Druid.
Requirements for compaction
Public interfaces
Changes for
DataSegmentDataSegmenthas aninterval, aversion, and apartitionIdas it is.(interval, version, partitionId)as it is.DataSegmenthas two new sets of partitionIds (each partitionId represents a segment).The
DataSegmentclass would be like below:Proposed changes
I propose to add a new lock type, segment lock. Unlike timeChunk lock, segmentLock is to coordinate the access (reading/writing) to individual segments. With segmnet lock, each task should get a lock whenever it reads or writes a segment. If two or more tasks request a lock for the same segment at the same time, only one task will get a lock and others should wait for the task to release the lock. Since two or more tasks can read or write segments of the same timeChunk, there should be no lock contention between appending tasks (e.g., kafkaIndexTask) and overwriting tasks (e.g., compactionTask).
This figure shows an example of the segment locking. A compactionTask should get locks for all input segments before it starts. It should also get locks for output segments before writing segments. A kafkaIndexTask gets a new lock whenever it reads an event of a new timestamp which is not covered by its current locks. Since each task has separate locks for individual segments, there's no lock contention between kafkaIndexTask and compactionTask even though they are running for the same dataSource and the same timeChunk.
Please note that segment lock doesn't replace the existing timeChunk lock. Instead, both can be used in different cases. Details are described in the below 'Changes in indexing service' section.
The segment lock makes the current timeline implementation broken because we should be able to figure out what segments are newer than others even in the same timeChunk. As a result, we need a new mechanism to support this new requirement for segment locking.
The below sections describes details of the algorithms for segment locking and timeline management.
Lock granularity (segment lock vs timeChunk lock)
Indexing tasks now have two options for locking, i.e., segment lock and timeChunk lock.
Segment lock
All tasks should use segment lock only except when they are overwriting existing segments with the same segmentGranularity.
In segment locking, a task first gets locks for all input segments if exist. Whenever the task writes a new row having the timestamp which are not covered by the current lock for output segments, it gets a new lock for the new output segment. This lock allocation for new segments involve allocating new segmentIds as well. If the task is overwriting existing segments, the new segmentIds would have the same version with existing segments but have them in their
overshadowedSegments. The detailed changes for segment allocation is described in the below 'Segment allocation' section.The
SegmentLockwould be like:With this change, a giant batch task can get a lot of segment locks instead of a single timeChunk lock.
TimeChunk lock
The timeChunk lock is same with the current taskLock. Tasks get a lock for a timeChunk (e.g.,
2019-01-01/2019-01-02) before reading segments from the timeChunk or writing new segments into the timeChunk.The timeChunk lock is used only when overwriting tasks change the segment granularity. For example, if a compactionTask with segmentGranularity =
DAYshould use the timeChunk lock if the original segmentGranularity wasHOUR.When a task requests a timeChunk lock, the overlord allocates a new version for the requested lock which must be higher than the versions of any existing segments in the same timeChunk. All new segments created with this lock would have the new version. Since new segments with a timeChunk lock would always have a higher version than existing segments, they overshadow all existing segments.
The
TimeChunkLockclass would look like below:Segment allocation
Segment allocation is allocating segmentIds for new segments. The segment allocation algorithm is different depending on the lock granularity and the rollup mode.
If the lock granularity = segmentLock, the segment allocation must be centralized in the overlord so that all segments of the same version have a different partitionId in the same timeChunk. If the lock granularity = timeChunkLock, each task can generate segments of any partitionIds by itself since new segments would have a higher version than existing segments.
If rollup mode = best-effort rollup, segments can be allocated dynamically whenever a new segment needs to be created. In the perfect rollup mode, all new segments should be allocated with a proper shardSpec at once before writing data into segments.
To sum up,
Segment metadata creation
In memory, segment metadata is stored in
DataSegment. The two new fields ofDataSegmentareovershadowedSegmentsandatomicUpdateGroup.overshadowSegmentsis a set of partitionIds of the input segments. For example, given a task which overwrites segments offoo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_versionandfoo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_version_1, all its output segments in the timeChunk of2019-01-01/2019-02-01would have anovershadowedSegmentsof (0, 1).overshadowedSegmentsis filled only when the task overwrites some existing segments with the same segment granularity. Otherwise, it remains empty.atomicUpdsteGroupis a set of partitionIds of segments which must become visible at the same time in the timeline. This is same with a set of partitionIds of segments which are published together per timeChunk. For example, given a task which generates new segments offoo_2019-01-01T00:00:00.000Z_2019-02-01T00:00:00.000Z_version_2,foo_2019-02-01T00:00:00.000Z_2019-03-01T00:00:00.000Z_version_2, andfoo_2019-02-01T00:00:00.000Z_2019-03-01T00:00:00.000Z_version_3, theatomicUpdateGroupof the first segment would be (2) while the second and the third segments would have (2, 3).atomicUpdateGroupis filled only whenovershadowedSegmentsis not empty.Changes in
TaskLockBoxTaskLockBoxshould support both timeChunk locks and segment locks. It checks the followings to detect lock collisions.groupId,interval, andlockGranularity(timeChunk or segment).Since segment allocation and lock creation should be performed atomically, the taskLockbox is also responsible for allocating new segmentIds. When a task sends a lock request, it explicitly specifies the partitionId for the new segment. The taskLockbox checks if the request is for a new segment and allocates it.
Note that the segment allocation in taskLockbox can happen only for segment locks.
Timeline management
The new timeline extends the capability of the existing timeline algorithm based on the versioned intervals; the timeline first compares the segment version and then their
overshadowedSegmentsto find the overshadowing relation between them.Since
overshadowedSegmentsstores only one-hop overshadowing relation, we need to track theovershadowedSegmentsof multiple hops to figure out all overshadowing relations. This is traversingovershadowedSegmentsgraph where each segment and the overshadowing relation between two segments are the node and the edge, respectively.The above figure shows an example of overshadowedSegments graph. Let's imagine the below scenario.
The created segments consist of the overshadowing graph. After
3., only segments 1, 6, 7, and 8 are visible.Coordinators and brokers should be able to update their timeline incrementally whenever a historical announces/unannounces a segment. Segments can have 3 different states in the timeline.
When a new segment is added, its state can be either
standbyorovershadowed. Then, its state can be changed likestandby -> visible <-> overshadowed.Bulk construction from a set of segments
visible. Otherwise, they arestandby.Incremental update for new segments
This algorithm is called whenever a new segment is added to a timeline. For example, a new segment is loaded by a historical, the segment is newly added to brokers' timeline.
visible.visibleimmediately. Otherwise, all segments in the overshadowedSegments are atomically swapped with the segments in the atomicUpdateGroup.Incremental update for removed segments
This algorithm is called whenever an existing segment is removed from a timeline. For example, if a historical becomes offline, all its segments are removed from brokers' timeline.
Unlike adding new segments, removing segments should consider falling back to existing segments. Suppose two segments, A and B, and the segment A overshadows the segment B. When the segment A is removed but the segment B is still available, the broker should be able to route queries to the historical serving the segment B.
visible, check its overshadowed segments.overshadowedorstandbystates, all segments in atomicUpdateGroup of the removed segments are atomically swapped with its overshadowed segments.This only works if there is no missing node in the timeline. To guarantee this, we need to add a new assumption that the timeline user (brokers, coordinators) are aware of all active segments of which
usedis set totruein the metadata store.The coordinator is already syncing active segments from the metastore periodically (
SQLMetadataSegmentManager).Brokers can do a similar sync to the coordinator. See #6834.
Compatibility, Deprecation, and Migration Plan
Since the both timeChunk and segment locks are supported, there's no compatibility issue for timeChunk lock requests from tasks of an old version.
Rejected Alternatives
There are two rejected alternatives.
The first proposal for background compaction
#4434
Storing all overshadowed segments in
overshadowedSegmentsofDataSegmentUnlike in the proposal, we can store all partitionIds of the overshadowed segments in
overshadowedSegmentsset. SinceovershadowedSegmentshas all overshadowed segments in it, we can check one segment overshadows another by simply checking theirovershadowingSegmentscontain each other. This would be simpler than traversing the graph of overshadowing relation.However, since
overshadowedSegmentswould always grow as new segments which overshadow existing segments are added, this wouldn't be a good design.Representing the overshadowing relation as comparing numbers
In this alternative, a partition of a segment is represented by a range of fraction, i.e.,
startPartitionIdandendPartitionId. For appending tasks, the partitions of new segments are increased by 1 like[0, 1), [1, 2), .... In overwriting tasks, the partitionId of each segment is(endPartitionId of last partition - startPartitionId of first partition) * i / (# of output partitions)whereiis the index for new segment.For example, suppose there are 3 segments of partitionIds
[0, 1),[1, 2), and[2, 3). If an overwriting task reads these segments and generates 2 segments, the new segments would have partitionIds of[0, 3/2)and[3/2, 3).Since a fraction can theoretically represent any number, any partition can be represented by a pair of fractions no matter how many times the partition has been split.
This idea was dropped because it's quite less intuitive than this proposal. Also, this assumes the partition ranges are always consecutive which can be not true.
Future work
The
overshadowedSegmentsmight become large if too many segments are compacted together and it might cause high memory consumption on brokers or the coordinator. To avoid this issue, we can employ a new task type, e.g., Clean Task, which clearsovershadowedSegmentsby getting timeChunk locks. Note that this task won't perform reindexing. Instead, it will get timeChunk locks, download segments, push & publish those segments with updated metadata (mostly version, overshadowedSegments, and loadSpec). This clean tasks would be spawned by the coordinator periodically to clean up segment metadata similar to automatic compaction.