Skip to content

Add octet streaming of sketchs in MSQ#16269

Merged
adarshsanjeev merged 14 commits intoapache:masterfrom
adarshsanjeev:octet-stream-sketch
May 28, 2024
Merged

Add octet streaming of sketchs in MSQ#16269
adarshsanjeev merged 14 commits intoapache:masterfrom
adarshsanjeev:octet-stream-sketch

Conversation

@adarshsanjeev
Copy link
Copy Markdown
Contributor

@adarshsanjeev adarshsanjeev commented Apr 12, 2024

There are a few issues with using Jackson serialization in sending datasketches between controller and worker in MSQ. This caused a blowup due to holding multiple copies of the sketch being stored.

This PR aims to resolve this by switching to deserializing the sketch payload without Jackson.

The PR adds a new query parameter used during communication between controller and worker while fetching sketches, "sketchEncoding".

  • If the value of this parameter is OCTET, the sketch is returned as a binary encoding, done by ClusterByStatisticsSnapshotSerde.
  • If the value is not the above, the sketch is encoded by Jackson as before.

Backward compatibility

Query param

The worker client now sends a query parameter which specifies the desired format with the request to fetch the sketch. If the worker receives this parameter, it returns the response encoded as a byte array, with the response header "Content-Type":"octet-stream". If the flag is missing, it defaults to the older behavior of Jackson serialized array.

The newer worker client checks the header and deserializes the response accordingly.

If a newer controller is running with an older worker, the worker will ignore the query parameter, and the controller will not see the response header, and use Jackson deserialization.

If a newer worker is running with an older controller, the query param will not be sent to the worker, which will again default to using Jackson, which the controller will automatically handle.

The above has been tested with an older WorkerClient and newer WorkerChatHandler, and with an older WorkerChatHandler and WorkerClient.

Byte format

The byte format contains a currently empty header byte, which can be used to store version information. This should allow updating the format in the future, if future versions check the header and deserialize accordingly.

Release note

  • Reduce memory usage while transferring sketches between MSQ controller and worker.

Benchmarks

Benchmarks for jackson serde vs octet serde. The octet serde seems to be around 40% faster in most cases.

Benchmark                                   (aggregate)  (numBuckets)  (numRows)  Mode  Cnt    Score    Error  Units
BenchmarkSketchTest.benchmarkJacksonSketch         true             1     100000  avgt    5    0.003 ±  0.001  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true             1    1000000  avgt    5    0.003 ±  0.001  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true          1000     100000  avgt    5    0.990 ±  0.041  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true          1000    1000000  avgt    5    0.932 ±  0.046  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false             1     100000  avgt    5   18.739 ±  1.410  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false             1    1000000  avgt    5   40.421 ±  0.269  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false          1000     100000  avgt    5   24.218 ±  0.516  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false          1000    1000000  avgt    5  296.199 ± 18.024  ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true             1     100000  avgt    5   ≈ 10⁻³           ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true             1    1000000  avgt    5   ≈ 10⁻³           ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true          1000     100000  avgt    5    0.175 ±  0.002  ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true          1000    1000000  avgt    5    0.193 ±  0.027  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false             1     100000  avgt    5   10.475 ±  6.384  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false             1    1000000  avgt    5   24.297 ±  1.731  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false          1000     100000  avgt    5   14.834 ±  3.089  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false          1000    1000000  avgt    5  126.198 ± 13.028  ms/op

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions Bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Apr 12, 2024
@cryptoe cryptoe marked this pull request as ready for review April 15, 2024 06:08
@cryptoe cryptoe added this to the 30.0.0 milestone Apr 29, 2024
@adarshsanjeev adarshsanjeev removed this from the 30.0.0 milestone May 10, 2024
Copy link
Copy Markdown
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review. Will finish the review soon.


@Override
public void exceptionCaught(ClientResponse<BytesFullResponseHolder> clientResponse, Throwable e)
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this be empty ?
Shouldn't we do something here ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to org.apache.druid.java.util.http.client.response.BytesFullResponseHandler#exceptionCaught, the response would not be ClientResponse#finished(T). My understanding is that nothing else needs to be done in this case.

@adarshsanjeev
Copy link
Copy Markdown
Contributor Author

adarshsanjeev commented May 17, 2024

Did some quick benchmarks.

Benchmark                                   (aggregate)  (numBuckets)  (numRows)  Mode  Cnt    Score    Error  Units
BenchmarkSketchTest.benchmarkJacksonSketch         true             1     100000  avgt    5    0.003 ±  0.001  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true             1    1000000  avgt    5    0.003 ±  0.001  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true          1000     100000  avgt    5    0.990 ±  0.041  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch         true          1000    1000000  avgt    5    0.932 ±  0.046  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false             1     100000  avgt    5   18.739 ±  1.410  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false             1    1000000  avgt    5   40.421 ±  0.269  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false          1000     100000  avgt    5   24.218 ±  0.516  ms/op
BenchmarkSketchTest.benchmarkJacksonSketch        false          1000    1000000  avgt    5  296.199 ± 18.024  ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true             1     100000  avgt    5   ≈ 10⁻³           ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true             1    1000000  avgt    5   ≈ 10⁻³           ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true          1000     100000  avgt    5    0.175 ±  0.002  ms/op
BenchmarkSketchTest.benchmarkOctetSketch           true          1000    1000000  avgt    5    0.193 ±  0.027  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false             1     100000  avgt    5   10.475 ±  6.384  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false             1    1000000  avgt    5   24.297 ±  1.731  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false          1000     100000  avgt    5   14.834 ±  3.089  ms/op
BenchmarkSketchTest.benchmarkOctetSketch          false          1000    1000000  avgt    5  126.198 ± 13.028  ms/op

Copy link
Copy Markdown
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left comments. Overall lgtm once the review comments are addressed.

final int length = Double.BYTES + 2 * Integer.BYTES + bucketKeyArray.length + serializedSnapshot.length;

outputStream.write(
ByteBuffer.allocate(Integer.BYTES + length)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mention that 4 bytes are for the length.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment


protected abstract byte[] serializeKeyCollector(KeyCollectorSnapshot collectorSnapshot);

public byte[] serialize(KeyCollectorSnapshot collectorSnapshot)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test cases for null and empty collectors.
for all the serde methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test cases for empty collectors. We don't allow null collectors, we throw an error if the request tries to get a non existent collector before this point. Added a notnull annotation instead.

@adarshsanjeev adarshsanjeev merged commit 21f725f into apache:master May 28, 2024
ektravel pushed a commit to ektravel/druid that referenced this pull request May 29, 2024
There are a few issues with using Jackson serialization in sending datasketches between controller and worker in MSQ. This caused a blowup due to holding multiple copies of the sketch being stored.

This PR aims to resolve this by switching to deserializing the sketch payload without Jackson.

The PR adds a new query parameter used during communication between controller and worker while fetching sketches, "sketchEncoding".

    If the value of this parameter is OCTET, the sketch is returned as a binary encoding, done by ClusterByStatisticsSnapshotSerde.
    If the value is not the above, the sketch is encoded by Jackson as before.
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Dependencies Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants