Motivation
Currently, you can "append" data to an empty time chunk of a datasource regardless of the partitioning scheme. However, if you want to append to a non-empty time chunk, both the original datasource and the appended segments should be partitioned with dynamic partitioning scheme. This is not very useful since datasources are usually partitioned with non-dynamic partitioning scheme in batch ingestion.
Proposed changes
The idea is simply allowing hash/range partitionsSpec when appendToExisting is true in tuningConfig (this is not allowed for now). A batch task will create properly partitioned segments based on the given partitionsSpec which will be appended to an existing datasource.
At this point, a question might come up in your mind. What if the original datasource and the appended segments are partitioned with different partitioning schemes? To answer this question, we should think about whether or not it would be useful to have mixed partitioning schemes in a time chunk. A most promising use case for mixed partitioning schemes is streaming ingestion + minor compaction; the streaming ingestion continuously creates segments and the minor compaction repartitions those segments to be more optimized for faster query processing. Since we currently support only dynamic partitioning scheme for streaming ingestion (this is because it creates least number of segments with dynamic partitioning) while the minor compaction should support any kind of partitioning scheme, mixed partitioning schemes should be allowed in a time chunk.
Changes in segment allocation and building a timeline
To do this, we need 2 changes in segment allocation and building a timeline. For append, the batch task always uses the overlord-coordinating segment allocation to determine a unique partitionId. The overlord currently fails to allocate a segment ID if partitioning scheme of the segment is different from that of the original segments. This check should be removed to allow mixed partitioning schemes.
When a broker creates and updates a timeline, it checks each time chunk if it's complete. The completeness check mechanism is currently different depending on the shardSpec type and different shardSpecs are not compatible. This mechanism is implemented in PartitionHolder.isComplete() and PartitionChunk. For example, SingleDimensionShardSpec creates StringPartitionChunk while HashBasedNumberedShardSpec creates NumberedPartitionChunk which are not compatible to each other. To make them compatible, all shardSpec will create NumberedPartitionChunk. This change is also required for #10025.
Remove forceGuaranteedRollup
forceGuaranteedRollup is used to make sure the created segments are perfectly rolled up. As a result, it is currently coupled with partitionsSpec type; hash or range partitionsSpec should be used if it's set (I don't remember exactly why we have it now. I think it used to be useful before). However, when we allow append with hash/range partitionsSpec, its definition becomes vague since we cannot guarantee the perfect rollup across original segments and appended segments in a time chunk. I think we can remove it after this change but provides different levels of rollup based on the type of partitionsSpec.
Segment availability
Appended segments are not guaranteed to become queryable atomically even if they are created by a same task.
Rationale
#9241 was dropped since the use case of streaming ingestion + minor compaction described above will be popular and useful.
Operational impact
#10033 is the first PR which allows appending dynamically partitioned segments to hash/range-partitioned datasources. After #10033, the operators should be aware of that the appended segments to a range-partitioned datasource will not be recognized by Druid after downgrade. To avoid this, they should compact all time chunks which have appended segments before downgrade.
Test plan
- Unit tests and IT tests will be added
- Upgrade/downgrade tests
- Can druid read hash/range partitioned segments (without append) after upgrade?
- Can druid read hash/range partitioned segments (without append) after rollback?
- Can druid read hash/range partitioned segments (including appended ones) after downgrading and upgrading back?
- Can you append to a hash-partitioned datasource created in older versions of druid?
Future work
- Streaming ingestion is the most popular use case of append. We could support hash/range partitioning schemes for streaming ingestion.
Motivation
Currently, you can "append" data to an empty time chunk of a datasource regardless of the partitioning scheme. However, if you want to append to a non-empty time chunk, both the original datasource and the appended segments should be partitioned with dynamic partitioning scheme. This is not very useful since datasources are usually partitioned with non-dynamic partitioning scheme in batch ingestion.
Proposed changes
The idea is simply allowing hash/range partitionsSpec when
appendToExistingis true in tuningConfig (this is not allowed for now). A batch task will create properly partitioned segments based on the given partitionsSpec which will be appended to an existing datasource.At this point, a question might come up in your mind. What if the original datasource and the appended segments are partitioned with different partitioning schemes? To answer this question, we should think about whether or not it would be useful to have mixed partitioning schemes in a time chunk. A most promising use case for mixed partitioning schemes is streaming ingestion + minor compaction; the streaming ingestion continuously creates segments and the minor compaction repartitions those segments to be more optimized for faster query processing. Since we currently support only dynamic partitioning scheme for streaming ingestion (this is because it creates least number of segments with dynamic partitioning) while the minor compaction should support any kind of partitioning scheme, mixed partitioning schemes should be allowed in a time chunk.
Changes in segment allocation and building a timeline
To do this, we need 2 changes in segment allocation and building a timeline. For append, the batch task always uses the overlord-coordinating segment allocation to determine a unique partitionId. The overlord currently fails to allocate a segment ID if partitioning scheme of the segment is different from that of the original segments. This check should be removed to allow mixed partitioning schemes.
When a broker creates and updates a timeline, it checks each time chunk if it's complete. The completeness check mechanism is currently different depending on the shardSpec type and different shardSpecs are not compatible. This mechanism is implemented in
PartitionHolder.isComplete()andPartitionChunk. For example,SingleDimensionShardSpeccreatesStringPartitionChunkwhileHashBasedNumberedShardSpeccreatesNumberedPartitionChunkwhich are not compatible to each other. To make them compatible, all shardSpec will createNumberedPartitionChunk. This change is also required for #10025.Remove
forceGuaranteedRollupforceGuaranteedRollupis used to make sure the created segments are perfectly rolled up. As a result, it is currently coupled with partitionsSpec type; hash or range partitionsSpec should be used if it's set (I don't remember exactly why we have it now. I think it used to be useful before). However, when we allow append with hash/range partitionsSpec, its definition becomes vague since we cannot guarantee the perfect rollup across original segments and appended segments in a time chunk. I think we can remove it after this change but provides different levels of rollup based on the type of partitionsSpec.Segment availability
Appended segments are not guaranteed to become queryable atomically even if they are created by a same task.
Rationale
#9241 was dropped since the use case of streaming ingestion + minor compaction described above will be popular and useful.
Operational impact
#10033 is the first PR which allows appending dynamically partitioned segments to hash/range-partitioned datasources. After #10033, the operators should be aware of that the appended segments to a range-partitioned datasource will not be recognized by Druid after downgrade. To avoid this, they should compact all time chunks which have appended segments before downgrade.
Test plan
Future work