Skip to content

generic block compressed complex columns#16863

Merged
clintropolis merged 24 commits intoapache:masterfrom
clintropolis:compress-complex-metrics
Aug 27, 2024
Merged

generic block compressed complex columns#16863
clintropolis merged 24 commits intoapache:masterfrom
clintropolis:compress-complex-metrics

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented Aug 8, 2024

Description

Example wikipedia with users as DS HLL, added, deleted, delta, deltaBucket, commentLength as DS quantiles:
Screenshot 2024-08-07 at 10 26 12 PM

Example wikipedia with users and comment as DS HLL, page as DS Theta:
Screenshot 2024-08-07 at 10 34 49 PM

The performance measurements were not as much slower compared to uncompressed as I was expecting. For the following queries:

SELECT APPROX_COUNT_DISTINCT_DS_HLL("hll_string5") FROM "druid"."datasketches"
SELECT APPROX_COUNT_DISTINCT_DS_THETA("theta_string5") FROM "druid"."datasketches"
SELECT DS_GET_QUANTILE(DS_QUANTILES_SKETCH("quantiles_float4"), 0.5) FROM "druid"."datasketches"
SELECT DS_GET_QUANTILE(DS_QUANTILES_SKETCH("quantiles_long3"), 0.9) FROM "druid"."datasketches"
SELECT string2, APPROX_COUNT_DISTINCT_DS_HLL("hll_string5") FROM "druid"."datasketches" GROUP BY 1 ORDER BY 2 DESC
SELECT string2, APPROX_COUNT_DISTINCT_DS_THETA("theta_string5", 4096) FROM "druid"."datasketches" GROUP BY 1 ORDER BY 2 DESC
SELECT string2, DS_GET_QUANTILE(DS_QUANTILES_SKETCH("quantiles_float4"), 0.5) FROM "druid"."datasketches" GROUP BY 1 ORDER BY 2 DESC
SELECT string2, DS_GET_QUANTILE(DS_QUANTILES_SKETCH("quantiles_long3"), 0.9) FROM "druid"."datasketches" GROUP BY 1 ORDER BY 2 DESC

non-vectorized:

Benchmark                                   (complexCompression)  (query)  (rowsPerSegment)  (schema)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlComplexMetricsColumnsBenchmark.querySql                  none        0           1500000  explicit           MMAP              UTF8        false  avgt    5   57.890 ± 8.851  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        1           1500000  explicit           MMAP              UTF8        false  avgt    5   54.174 ± 1.228  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        2           1500000  explicit           MMAP              UTF8        false  avgt    5  112.113 ± 3.365  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        3           1500000  explicit           MMAP              UTF8        false  avgt    5   85.009 ± 1.475  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        4           1500000  explicit           MMAP              UTF8        false  avgt    5  104.002 ± 2.100  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        5           1500000  explicit           MMAP              UTF8        false  avgt    5  117.752 ± 1.293  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        6           1500000  explicit           MMAP              UTF8        false  avgt    5  147.877 ± 1.524  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        7           1500000  explicit           MMAP              UTF8        false  avgt    5  124.101 ± 1.810  ms/op
Benchmark                                   (complexCompression)  (query)  (rowsPerSegment)  (schema)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        0           1500000  explicit           MMAP              UTF8        false  avgt    5   86.227 ± 1.369  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        1           1500000  explicit           MMAP              UTF8        false  avgt    5   71.209 ± 1.026  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        2           1500000  explicit           MMAP              UTF8        false  avgt    5  153.782 ± 1.160  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        3           1500000  explicit           MMAP              UTF8        false  avgt    5  120.064 ± 4.754  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        4           1500000  explicit           MMAP              UTF8        false  avgt    5  186.183 ± 4.311  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        5           1500000  explicit           MMAP              UTF8        false  avgt    5  165.194 ± 3.460  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        6           1500000  explicit           MMAP              UTF8        false  avgt    5  203.164 ± 2.092  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        7           1500000  explicit           MMAP              UTF8        false  avgt    5  170.323 ± 3.046  ms/op

vectorized:

Benchmark                                   (complexCompression)  (query)  (rowsPerSegment)  (schema)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlComplexMetricsColumnsBenchmark.querySql                  none        0           1500000  explicit           MMAP              UTF8         true  avgt    5   53.167 ± 1.413  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        1           1500000  explicit           MMAP              UTF8         true  avgt    5   59.719 ± 2.454  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        2           1500000  explicit           MMAP              UTF8         true  avgt    5  123.077 ± 3.370  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        3           1500000  explicit           MMAP              UTF8         true  avgt    5   99.116 ± 3.954  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        4           1500000  explicit           MMAP              UTF8         true  avgt    5   92.114 ± 2.093  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        5           1500000  explicit           MMAP              UTF8         true  avgt    5  106.755 ± 7.177  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        6           1500000  explicit           MMAP              UTF8         true  avgt    5  139.057 ± 4.730  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                  none        7           1500000  explicit           MMAP              UTF8         true  avgt    5  112.861 ± 3.193  ms/op
Benchmark                                   (complexCompression)  (query)  (rowsPerSegment)  (schema)  (storageType)  (stringEncoding)  (vectorize)  Mode  Cnt    Score   Error  Units
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        0           1500000  explicit           MMAP              UTF8         true  avgt    5   84.603 ± 2.109  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        1           1500000  explicit           MMAP              UTF8         true  avgt    5   74.182 ± 1.628  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        2           1500000  explicit           MMAP              UTF8         true  avgt    5  159.461 ± 2.963  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        3           1500000  explicit           MMAP              UTF8         true  avgt    5  128.482 ± 3.908  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        4           1500000  explicit           MMAP              UTF8         true  avgt    5  125.426 ± 1.908  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        5           1500000  explicit           MMAP              UTF8         true  avgt    5  115.135 ± 0.912  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        6           1500000  explicit           MMAP              UTF8         true  avgt    5  185.147 ± 5.391  ms/op
SqlComplexMetricsColumnsBenchmark.querySql                   lz4        7           1500000  explicit           MMAP              UTF8         true  avgt    5  147.139 ± 3.170  ms/op

with segment sizes:

369M	tmp/datasketches_2000-01-01T00:00:00.000Z_2000-01-02T00:00:00.000Z_1_3bc4e5d42bfcc3ffa270840ed672f90933bb97947d768be721be04cebe5f794f/merged/00000.smoosh
511M	tmp/datasketches_2000-01-01T00:00:00.000Z_2000-01-02T00:00:00.000Z_1_4b2bc102accdf4d97c706eb145f199b0efe7db9365f89c1931d67a5066be13a8/merged/00000.smoosh

The benchmarks are not present in this PR, i've done some pretty heavy refactoring of the sql benchmarks so will add them in a follow-up PR.

changes:

  • Adds new CompressedComplexColumn, CompressedComplexColumnSerializer, CompressedComplexColumnSupplier based on CompressedVariableSizedBlobColumn used by JSON columns
  • Adds IndexSpec.complexMetricCompression which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
  • Adds new definition of ComplexMetricSerde.getSerializer which accepts an IndexSpec argument when creating a serializer. The old signature has been marked @Deprecated and has a default implementation that returns null, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a CompressedComplexColumnSerializer if IndexSpec.complexMetricCompression is not null/none/uncompressed, or will use LargeColumnSupportedComplexColumnSerializer otherwise.
  • Removed all duplicate generic implementations of ComplexMetricSerde.getSerializer and ComplexMetricSerde.deserializeColumn into default implementations ComplexMetricSerde instead of being copied all over the place. The default implementation of deserializeColumn will check if the first byte indicates that the new compression was used, otherwise will use the GenericIndexed based supplier.
  • Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around ObjectStrategy. This should not preclude further specializing specific complex types in the future, this is just providing a generic base way to have compression to save some space.

Release note

Compression is now available for all "complex" metric columns which do not have specialized implementations through a new IndexSpec option, complexMetricCompression, which defaults to uncompressed for backwards compatibility, but can be configured to any compression strategy (lz4, zstd, etc). This works for most complex columns except for compressed-big-decimal, and the columns stored by first/last aggregators.

Note that enabling compression is not backwards compatible with Druid versions older than 31, so only enable this functionality once certain there is no need to roll-back to an older Druid version.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
@github-actions github-actions Bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Aug 8, 2024
ColumnConfig columnConfig
)
{
deserializeColumn(buffer, builder);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [ComplexMetricSerde.deserializeColumn](1) should be avoided because it has been deprecated.
*/
@Nullable
@Deprecated
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'segmentWriteOutMedium' is never used.
*/
@Nullable
@Deprecated
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'column' is never used.
{
return ComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
// backwards compatibility
final GenericColumnSerializer serializer = getSerializer(segmentWriteOutMedium, column);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [ComplexMetricSerde.getSerializer](1) should be avoided because it has been deprecated.
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
getObjectStrategy()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [ComplexMetricSerde.getObjectStrategy](1) should be avoided because it has been deprecated.
segmentWriteOutMedium,
column,
indexSpec,
getObjectStrategy()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [ComplexMetricSerde.getObjectStrategy](1) should be avoided because it has been deprecated.
Comment thread docs/ingestion/ingestion-spec.md Outdated
/**
* Whether the {@link #fromByteBuffer(ByteBuffer, int)}, {@link #fromByteBufferWithSize(ByteBuffer)}, and
* {@link #fromByteBufferSafe(ByteBuffer, int)} methods return an object that may retain a reference to the provided
* {@link ByteBuffer}. If a reference is sometimes retained, this method returns true. It returns false if, and only
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should clarify: does this mean "retains a reference to the specific ByteBuffer object" or "retains a reference to the same underlying memory"?

i.e., if an ObjectStrategy calls duplicate() on the buf and retains the duplicate, should it return true or false from this method?

Copy link
Copy Markdown
Member Author

@clintropolis clintropolis Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah i suppose it does need to clarify that it means retains the same memory, will update

final ByteBuffer dupe = decompressedDataBuffer.duplicate().order(byteOrder);
dupe.position(startBlockOffset).limit(startBlockOffset + size);
return dupe.slice().order(byteOrder);
// sweet, same buffer, we can return the buffer directly with position and limit set
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment appears to no longer be 100% accurate

// otherwise, use compressed or generic indexed based serializer
CompressionStrategy strategy = indexSpec.getComplexMetricCompression();
if (strategy == null || CompressionStrategy.NONE == strategy || CompressionStrategy.UNCOMPRESSED == strategy) {
return LargeColumnSupportedComplexColumnSerializer.create(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code had a default of ComplexColumnSerializer. Why change that? (Is it strictly better? Are there compatibility concerns?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LargeColumnSupportedComplexColumnSerializer is basically identical to ComplexColumnSerializer, they both use GenericIndexedWriter with a filename of StringUtils.format("%s.complex_column", filenameBase), the main difference is that LargeColumnSupportedComplexColumnSerializer passes the FileSmoosher through to the writer allowing it to write v2 GenericIndexed which has the multi file support. So I think there shouldn't be any compatibility concerns because on the read side since both of these just use GenericIndexed.read

{
if (!closedForWrite) {
closedForWrite = true;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this dead code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, noticed while poking around other stuff

channel.write(ByteBuffer.wrap(new byte[]{V0}));
channel.write(ByteBuffer.wrap(metadataBytes));

NestedCommonFormatColumnSerializer.writeInternal(smoosher, writer, name, FILE_NAME);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps move that writeInternal to a more neutral helper class? It seems odd for the complex column serializer to call into an internal method of nested column serializer. I mean, it's fine, just a little odd.

Copy link
Copy Markdown
Member Author

@clintropolis clintropolis Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, fair.

I think I would like NestedCommonFormatColumnSerializer itself to be a more neutral thing since i think the way it splits up parts into separate smoosh files is advantageous if we implement stuff like partial segment download, though I'd like to make some adjustments to smoosh metadata format to eliminate the chance for name collisions, though definitely not going to do any of that in this PR.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after the latest changes.

@clintropolis clintropolis merged commit f8301a3 into apache:master Aug 27, 2024
@clintropolis clintropolis deleted the compress-complex-metrics branch August 27, 2024 07:34
hevansDev pushed a commit to hevansDev/druid that referenced this pull request Aug 29, 2024
changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
@clintropolis clintropolis mentioned this pull request Feb 5, 2026
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Segment Format and Ser/De

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants