You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, native parallel batch indexing (#5543) only supports hashed partitions. Add support for range partitioning so that native parallel batch indexing has feature parity with hadoop batch indexing.
Proposed changes
For native parallel batch indexing hash partitioning, ParallelIndexSupervisorTask.runMultiPhaseParallel() runs two phases:
PartialSegmentGenerateTask: Each worker takes a firehose split and creates partial segments by hashing rows from the split
PartialSegmentMergeTask: Each worker merges the set of partial segments for each partition
For range partitions, the PartialSegmentMergeTask can be reused, but partial segment generation needs a different algorithm:
Each worker takes a firehose split and generates an approximate histogram in memory that records the frequency of each key (timestamp adjusted by query granularity and dimension value tuple) in the split. In particular, quantiles.ItemsSketch from DataSketches can be used as the implementation of the approximate histogram.
The most accurate ItemsSketch of k=32768 (which also uses more space) has a normalized rank error of 0.009% and for extremely large streams (>1B items), retains no more than 600k items. Assuming each item is about 1KB (~500 characters in the dimension value), the histogram will use a max of about 600MB memory, so it would not need to be periodically flushed to disk. I’m not sure what serialized size this translates to, but it’s possible that it’s too large to add as part of the task report payload, in which case it’d need to be written to disk and then copied to the supervisor.
- k can be exposed as a configurable value in the ingestion spec, in case users wish to reduce the memory consumption
The supervisor gets all the partial ItemsSketches generated earlier (either via the TaskReport payload or by copying the serialized ItemSketches from disk) and then merges them by using quantiles.ItemsUnion. From the merged ItemsSketch, it uses ItemsSketch.getQuantiles() to creates partition ranges that satisfy the criteria specified in SingleDimensionPartitionsSpec.
Because ItemsSketch yields estimated quantile values, enforcing maxRowsPerSegment in the partitionSpec can only be done during PartialSegmentMergeTask when the final merged segments are created.
Each worker receives the list of partitions ranges and for each firehose split, creates partial segments by using the partition ranges.
NOTE: In this initial version of implementing native parallel batch range partitioning, specifying the partition dimension is required (unlike hadoop batch indexing).
Rationale
Some other alternatives considered were:
Exact Histogram
Because an exact histogram would use much more memory than an approximate histogram (e.g., one from DataSketches), it would need to be periodically flushed to disk. To facilitate subsequent merging of histograms without consuming too much memory, the histogram would need to be serialized in sorted order so that a heap-merge can later be used. The main disadvantage of the exact histogram approach is reduced performance due to the amount of disk I/Os.
Operational impact
None expected.
Test plan
Unit tests and integrations tests, similar to the ones for native parallel batch indexing hashed partitioning.
Future work
Automatic Partition Dimension Selection
This is a niche feature of hadoop batch indexing, and can be added later. Determining the best partition dimension can be complicated as many factors need to be considered (e.g., impact on query speed, impact on ingestion speed, etc.).
Motivation
Currently, native parallel batch indexing (#5543) only supports hashed partitions. Add support for range partitioning so that native parallel batch indexing has feature parity with hadoop batch indexing.
Proposed changes
For native parallel batch indexing hash partitioning,
ParallelIndexSupervisorTask.runMultiPhaseParallel()runs two phases:PartialSegmentGenerateTask: Each worker takes a firehose split and creates partial segments by hashing rows from the splitPartialSegmentMergeTask: Each worker merges the set of partial segments for each partitionFor range partitions, the
PartialSegmentMergeTaskcan be reused, but partial segment generation needs a different algorithm:Each worker takes a firehose split and generates an approximate histogram in memory that records the frequency of each key (timestamp adjusted by query granularity and dimension value tuple) in the split. In particular,
quantiles.ItemsSketchfrom DataSketches can be used as the implementation of the approximate histogram.ItemsSketchofk=32768(which also uses more space) has a normalized rank error of 0.009% and for extremely large streams (>1B items), retains no more than 600k items. Assuming each item is about 1KB (~500 characters in the dimension value), the histogram will use a max of about 600MB memory, so it would not need to be periodically flushed to disk. I’m not sure what serialized size this translates to, but it’s possible that it’s too large to add as part of the task report payload, in which case it’d need to be written to disk and then copied to the supervisor.-
kcan be exposed as a configurable value in the ingestion spec, in case users wish to reduce the memory consumptionItemsSketchwill require moving DataSketches from extensions to core, which may block use the latest release of datasketches #8647The supervisor gets all the partial
ItemsSketches generated earlier (either via theTaskReportpayload or by copying the serializedItemSketches from disk) and then merges them by usingquantiles.ItemsUnion. From the mergedItemsSketch, it usesItemsSketch.getQuantiles()to creates partition ranges that satisfy the criteria specified inSingleDimensionPartitionsSpec.ItemsSketchyields estimated quantile values, enforcingmaxRowsPerSegmentin thepartitionSpeccan only be done duringPartialSegmentMergeTaskwhen the final merged segments are created.Each worker receives the list of partitions ranges and for each firehose split, creates partial segments by using the partition ranges.
NOTE: In this initial version of implementing native parallel batch range partitioning, specifying the partition dimension is required (unlike hadoop batch indexing).
Rationale
Some other alternatives considered were:
Exact Histogram
Because an exact histogram would use much more memory than an approximate histogram (e.g., one from DataSketches), it would need to be periodically flushed to disk. To facilitate subsequent merging of histograms without consuming too much memory, the histogram would need to be serialized in sorted order so that a heap-merge can later be used. The main disadvantage of the exact histogram approach is reduced performance due to the amount of disk I/Os.
Operational impact
None expected.
Test plan
Unit tests and integrations tests, similar to the ones for native parallel batch indexing hashed partitioning.
Future work
Automatic Partition Dimension Selection
This is a niche feature of hadoop batch indexing, and can be added later. Determining the best partition dimension can be complicated as many factors need to be considered (e.g., impact on query speed, impact on ingestion speed, etc.).