From 7f42f84dc11a81d125358b342348460a2e277f0e Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Wed, 17 Jul 2019 16:50:14 -0700 Subject: [PATCH 1/6] SQL support for t-digest based sketch aggregators --- .../tdigestsketch-quantiles.md | 81 ++-- docs/content/querying/sql.md | 2 + extensions-contrib/tdigestsketch/pom.xml | 26 ++ .../TDigestMergeSketchAggregator.java | 83 ---- .../TDigestMergeSketchAggregatorFactory.java | 64 --- .../TDigestMergeSketchBufferAggregator.java | 110 ----- ...ator.java => TDigestSketchAggregator.java} | 10 +- ...va => TDigestSketchAggregatorFactory.java} | 35 +- ...ava => TDigestSketchBufferAggregator.java} | 8 +- .../TDigestSketchComplexMetricSerde.java | 2 +- .../tdigestsketch/TDigestSketchModule.java | 22 +- .../TDigestSketchObjectStrategy.java | 2 +- ...TDigestSketchToQuantilePostAggregator.java | 147 +++++++ ...DigestSketchToQuantilesPostAggregator.java | 2 +- .../TDigestGenerateSketchSqlAggregator.java | 194 +++++++++ .../TDigestSketchQuantileSqlAggregator.java | 229 +++++++++++ .../TDigestSketchAggregatorTest.java | 24 +- .../sql/TDigestSketchSqlAggregatorTest.java | 383 ++++++++++++++++++ 18 files changed, 1059 insertions(+), 365 deletions(-) delete mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java delete mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java delete mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java rename extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/{TDigestBuildSketchAggregator.java => TDigestSketchAggregator.java} (85%) rename extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/{TDigestBuildSketchAggregatorFactory.java => TDigestSketchAggregatorFactory.java} (80%) rename extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/{TDigestBuildSketchBufferAggregator.java => TDigestSketchBufferAggregator.java} (93%) create mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java create mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java create mode 100644 extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java create mode 100644 extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java diff --git a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md index 3fc07cf29820..94b07e90c4b6 100644 --- a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md +++ b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md @@ -29,16 +29,13 @@ T-Digest (https://github.com/tdunning/t-digest) is a popular datastructure for a rank-based statistics such as quantiles and trimmed means. The datastructure is also designed for parallel programming use cases like distributed aggregations or map reduce jobs by making combining two intermediate t-digests easy and efficient. -There are three flavors of T-Digest sketch aggregator available in Apache Druid (incubating): +There are two flavors of T-Digest sketch aggregator available in Apache Druid (incubating): -1. buildTDigestSketch - used for building T-Digest sketches from raw numeric values. It generally makes sense to -use this aggregator when ingesting raw data into Druid. One can also use this aggregator during query time too to -generate sketches, just that one would be building these sketches on every query execution instead of building them -once during ingestion. -2. mergeTDigestSketch - used for merging pre-built T-Digest sketches. This aggregator is generally used during -query time to combine sketches generated by buildTDigestSketch aggregator. -3. quantilesFromTDigestSketch - used for generating quantiles from T-Digest sketches. This aggregator is generally used -during query time to generate quantiles from sketches built using the above two sketch generating aggregators. +1. buildTDigestSketch - It generally makes sense to use this aggregator when ingesting raw data into Druid to +generate pre-agrgegated sketches. One can also use this aggregator during query time too to combine ingested T-Digest +sketches. +2. quantilesFromTDigestSketch - used for generating quantiles from T-Digest sketches. This aggregator is generally used +during query time to generate quantiles from sketches built using buildTDigestSketch. To use this aggregator, make sure you [include](../../operations/including-extensions.html) the extension in your config file: @@ -48,7 +45,8 @@ druid.extensions.loadList=["druid-tdigestsketch"] ### Aggregator -The result of the aggregation is a T-Digest sketch that is built ingesting numeric values from the raw data. +The result of the aggregation is a T-Digest sketch that is built ingesting numeric values from the raw data or from +combining pre-generated T-Digest sketches. ```json { @@ -70,49 +68,22 @@ Example: } ``` -|property|description|required?| -|--------|-----------|---------| -|type|This String should always be "buildTDigestSketch"|yes| -|name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| -|compression|Parameter that determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches.|no, defaults to 100| - - -The result of the aggregation is a T-Digest sketch that is built by merging pre-built T-Digest sketches. - ```json { - "type" : "mergeTDigestSketch", - "name" : , - "fieldName" : , - "compression": - } + "type": "buildTDigestSketch", + "name": "combined_sketch", + "fieldName": "ingested_sketch", + "compression": 200 +} ``` |property|description|required?| |--------|-----------|---------| -|type|This String should always be "mergeTDigestSketch"|yes| +|type|This String should always be "buildTDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| +|fieldName|A String for the name of the input field containing raw numeric values or pre-generated T-Digest sketches.|yes| |compression|Parameter that determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches.|no, defaults to 100| -Example: - -```json -{ - "queryType": "groupBy", - "dataSource": "test_datasource", - "granularity": "ALL", - "dimensions": [], - "aggregations": [{ - "type": "mergeTDigestSketch", - "name": "merged_sketch", - "fieldName": "ingested_sketch", - "compression": 200 - }], - "intervals": ["2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z"] -} -``` ### Post Aggregators @@ -133,7 +104,7 @@ This returns an array of quantiles corresponding to a given array of fractions. |--------|-----------|---------| |type|This String should always be "quantilesFromTDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| +|field|A field reference pointing to the field aggregated/combined T-Digest sketch.|yes| |fractions|Non-empty array of fractions between 0 and 1|yes| Example: @@ -145,7 +116,7 @@ Example: "granularity": "ALL", "dimensions": [], "aggregations": [{ - "type": "mergeTDigestSketch", + "type": "buildTDigestSketch", "name": "merged_sketch", "fieldName": "ingested_sketch", "compression": 200 @@ -162,3 +133,21 @@ Example: "intervals": ["2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z"] } ``` + +Similar to quantilesFromTDigestSketch except it takes in a single fraction for computing quantile. + +```json +{ + "type" : "quantileFromTDigestSketch", + "name": , + "field" : , + "fraction" : +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This String should always be "quantilesFromTDigestSketch"|yes| +|name|A String for the output (result) name of the calculation.|yes| +|field|A field reference pointing to the field aggregated/combined T-Digest sketch.|yes| +|fraction|Decimal value between 0 and 1|yes| diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 73c5e2d554dc..ef43f1af9593 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -184,6 +184,8 @@ Only the COUNT aggregation can accept DISTINCT. |`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.html) exprs. The "probability" should be between 0 and 1 (exclusive). The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.| |`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric or [fixed buckets histogram](../development/extensions-core/approximate-histograms.html#fixed-buckets-histogram) exprs. The "probability" should be between 0 and 1 (exclusive). The `numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters are described in the fixed buckets histogram documentation. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.| |`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positive rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.| +|`TDIGEST_QUANTILE(expr, quantileFraction)`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile .See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| +|`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on values produced by `expr`. Compression parameter (default value 100) determines the accuracy and size of the sketch Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| |`VAR_POP(expr)`|Computes variance population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`VAR_SAMP(expr)`|Computes variance sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`VARIANCE(expr)`|Computes variance sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| diff --git a/extensions-contrib/tdigestsketch/pom.xml b/extensions-contrib/tdigestsketch/pom.xml index e6f25a70a4f7..32928290621a 100644 --- a/extensions-contrib/tdigestsketch/pom.xml +++ b/extensions-contrib/tdigestsketch/pom.xml @@ -58,6 +58,24 @@ ${project.parent.version} provided + + com.google.code.findbugs + jsr305 + provided + + + com.google.inject + guice + provided + + + org.apache.druid + druid-sql + ${project.parent.version} + provided + + + junit junit @@ -86,6 +104,14 @@ org.apache.druid druid-server ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-sql + ${project.parent.version} + test-jar test diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java deleted file mode 100644 index 0f6002344db9..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.tdunning.math.stats.MergingDigest; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.segment.ColumnValueSelector; - - -/** - * Aggregator that merges T-Digest based sketches generated from {@link TDigestBuildSketchAggregator} - */ -public class TDigestMergeSketchAggregator implements Aggregator -{ - private final ColumnValueSelector selector; - - @GuardedBy("this") - private MergingDigest tdigestSketch; - - public TDigestMergeSketchAggregator( - ColumnValueSelector selector, - final Integer compression - ) - { - this.selector = selector; - this.tdigestSketch = new MergingDigest(compression); - } - - @Override - public void aggregate() - { - final MergingDigest sketch = selector.getObject(); - if (sketch == null) { - return; - } - synchronized (this) { - this.tdigestSketch.add(sketch); - } - } - - @Override - public synchronized Object get() - { - return tdigestSketch; - } - - @Override - public float getFloat() - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong() - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public synchronized void close() - { - tdigestSketch = null; - } - -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java deleted file mode 100644 index 9a0e9a98fb0f..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.tdunning.math.stats.MergingDigest; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.AggregatorUtil; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; - -/** - * Factory to create {@link TDigestMergeSketchAggregator} - */ -public class TDigestMergeSketchAggregatorFactory extends TDigestBuildSketchAggregatorFactory -{ - public static final String TYPE_NAME = "mergeTDigestSketch"; - - @JsonCreator - public TDigestMergeSketchAggregatorFactory( - @JsonProperty("name") final String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("compression") final Integer compression - ) - { - super(name, fieldName, compression, AggregatorUtil.TDIGEST_MERGE_SKETCH_CACHE_TYPE_ID); - } - - @Override - public Aggregator factorize(final ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( - getFieldName()); - return new TDigestMergeSketchAggregator(selector, getCompression()); - } - - @Override - public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( - getFieldName() - ); - return new TDigestMergeSketchBufferAggregator(selector, getCompression()); - } -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java deleted file mode 100644 index da4423ef8590..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.tdunning.math.stats.MergingDigest; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.segment.ColumnValueSelector; - -import java.nio.ByteBuffer; -import java.util.IdentityHashMap; - -/** - * Aggregator that is capable of combining T-Digest sketch serialized as {@link ByteBuffer} - */ -public class TDigestMergeSketchBufferAggregator implements BufferAggregator -{ - private final ColumnValueSelector selector; - private final int compression; - private final IdentityHashMap> sketchCache = new IdentityHashMap(); - - public TDigestMergeSketchBufferAggregator( - ColumnValueSelector selector, - int compression - ) - { - this.selector = selector; - this.compression = compression; - } - - @Override - public void init(ByteBuffer buffer, int position) - { - MergingDigest emptyDigest = new MergingDigest(compression); - addToCache(buffer, position, emptyDigest); - } - - @Override - public void aggregate(ByteBuffer buffer, int position) - { - final MergingDigest sketch = selector.getObject(); - if (sketch == null) { - return; - } - final MergingDigest union = sketchCache.get(buffer).get(position); - union.add(sketch); - } - - @Override - public Object get(ByteBuffer buffer, int position) - { - // sketchCache is an IdentityHashMap where the reference of buffer is used for equality checks. - // So the returned object isn't impacted by the changes in the buffer object made by concurrent threads. - return sketchCache.get(buffer).get(position); - } - - @Override - public float getFloat(final ByteBuffer buffer, final int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong(final ByteBuffer buffer, final int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void close() - { - sketchCache.clear(); - } - - @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - MergingDigest sketch = sketchCache.get(oldBuffer).get(oldPosition); - addToCache(newBuffer, newPosition, sketch); - final Int2ObjectMap map = sketchCache.get(oldBuffer); - map.remove(oldPosition); - if (map.isEmpty()) { - sketchCache.remove(oldBuffer); - } - } - - private void addToCache(final ByteBuffer buffer, final int position, final MergingDigest union) - { - Int2ObjectMap map = sketchCache.computeIfAbsent(buffer, b -> new Int2ObjectOpenHashMap<>()); - map.put(position, union); - } -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java similarity index 85% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java index 390c0f92b8b7..730fdef90ac9 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java @@ -36,7 +36,7 @@ * One can use this aggregator to build these sketches during query time too, just * that it will be slower and more resource intensive. */ -public class TDigestBuildSketchAggregator implements Aggregator +public class TDigestSketchAggregator implements Aggregator { private final ColumnValueSelector selector; @@ -45,13 +45,13 @@ public class TDigestBuildSketchAggregator implements Aggregator private MergingDigest histogram; - public TDigestBuildSketchAggregator(ColumnValueSelector selector, @Nullable Integer compression) + public TDigestSketchAggregator(ColumnValueSelector selector, @Nullable Integer compression) { this.selector = selector; if (compression != null) { this.histogram = new MergingDigest(compression); } else { - this.histogram = new MergingDigest(TDigestBuildSketchAggregatorFactory.DEFAULT_COMPRESSION); + this.histogram = new MergingDigest(TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION); } } @@ -62,6 +62,10 @@ public void aggregate() synchronized (this) { histogram.add(((Number) selector.getObject()).doubleValue()); } + } else if (selector.getObject() instanceof MergingDigest) { + synchronized (this) { + histogram.add((MergingDigest) selector.getObject()); + } } else { TDigestSketchUtils.throwExceptionForWrongType(selector); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java similarity index 80% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java index 08f88b66ae50..24cf7283fa6a 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; import org.apache.druid.query.aggregation.Aggregator; @@ -30,9 +31,6 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.column.ValueType; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -55,7 +53,8 @@ * when we have to merge intermediate aggregations which Druid needs to do as * part of query processing. */ -public class TDigestBuildSketchAggregatorFactory extends AggregatorFactory +@JsonTypeName(TDigestSketchAggregatorFactory.TYPE_NAME) +public class TDigestSketchAggregatorFactory extends AggregatorFactory { // Default compression @@ -74,7 +73,7 @@ public class TDigestBuildSketchAggregatorFactory extends AggregatorFactory public static final String TYPE_NAME = "buildTDigestSketch"; @JsonCreator - public TDigestBuildSketchAggregatorFactory( + public TDigestSketchAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, @Nullable @JsonProperty("compression") final Integer compression @@ -83,7 +82,7 @@ public TDigestBuildSketchAggregatorFactory( this(name, fieldName, compression, AggregatorUtil.TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID); } - TDigestBuildSketchAggregatorFactory( + TDigestSketchAggregatorFactory( final String name, final String fieldName, @Nullable final Integer compression, @@ -109,27 +108,13 @@ public byte[] getCacheKey() @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); - if (cap == null || ValueType.isNumeric(cap.getType())) { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestBuildSketchAggregator(selector, compression); - } else { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestMergeSketchAggregator(selector, compression); - } + return new TDigestSketchAggregator(metricFactory.makeColumnValueSelector(fieldName), compression); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); - if (cap == null || ValueType.isNumeric(cap.getType())) { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestBuildSketchBufferAggregator(selector, compression); - } else { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestMergeSketchBufferAggregator(selector, compression); - } + return new TDigestSketchBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), compression); } public static final Comparator COMPARATOR = Comparator.nullsFirst( @@ -159,7 +144,7 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregatorFactory getCombiningFactory() { - return new TDigestMergeSketchAggregatorFactory(name, name, compression); + return new TDigestSketchAggregatorFactory(name, name, compression); } @Override @@ -176,7 +161,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre public List getRequiredColumns() { return Collections.singletonList( - new TDigestBuildSketchAggregatorFactory( + new TDigestSketchAggregatorFactory( fieldName, fieldName, compression @@ -243,7 +228,7 @@ public boolean equals(final Object o) if (o == null || !getClass().equals(o.getClass())) { return false; } - final TDigestBuildSketchAggregatorFactory that = (TDigestBuildSketchAggregatorFactory) o; + final TDigestSketchAggregatorFactory that = (TDigestSketchAggregatorFactory) o; return Objects.equals(name, that.name) && Objects.equals(fieldName, that.fieldName) && diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java similarity index 93% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java index a9f1dd1ea902..3a9eb140a07b 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java @@ -34,7 +34,7 @@ /** * Aggregator that builds T-Digest backed sketch using numeric values read from {@link ByteBuffer} */ -public class TDigestBuildSketchBufferAggregator implements BufferAggregator +public class TDigestSketchBufferAggregator implements BufferAggregator { @Nonnull @@ -42,7 +42,7 @@ public class TDigestBuildSketchBufferAggregator implements BufferAggregator private final int compression; private final IdentityHashMap> sketchCache = new IdentityHashMap(); - public TDigestBuildSketchBufferAggregator( + public TDigestSketchBufferAggregator( final ColumnValueSelector valueSelector, @Nullable final Integer compression ) @@ -52,7 +52,7 @@ public TDigestBuildSketchBufferAggregator( if (compression != null) { this.compression = compression; } else { - this.compression = TDigestBuildSketchAggregatorFactory.DEFAULT_COMPRESSION; + this.compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; } } @@ -70,6 +70,8 @@ public void aggregate(ByteBuffer buffer, int position) Object x = selector.getObject(); if (x instanceof Number) { sketch.add(((Number) x).doubleValue()); + } else if (x instanceof MergingDigest) { + sketch.add((MergingDigest) x); } else { TDigestSketchUtils.throwExceptionForWrongType(selector); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java index 52522b88ee00..d44ce80fc7e3 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java @@ -41,7 +41,7 @@ public class TDigestSketchComplexMetricSerde extends ComplexMetricSerde @Override public String getTypeName() { - return TDigestBuildSketchAggregatorFactory.TYPE_NAME; + return TDigestSketchAggregatorFactory.TYPE_NAME; } @Override diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java index a84785a76ba0..1a5150fdf746 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java @@ -27,7 +27,10 @@ import com.google.inject.Binder; import com.tdunning.math.stats.MergingDigest; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.tdigestsketch.sql.TDigestGenerateSketchSqlAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.sql.TDigestSketchQuantileSqlAggregator; import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.sql.guice.SqlBindings; import java.util.List; @@ -44,16 +47,16 @@ public List getJacksonModules() getClass().getSimpleName() ).registerSubtypes( new NamedType( - TDigestBuildSketchAggregatorFactory.class, - TDigestBuildSketchAggregatorFactory.TYPE_NAME - ), - new NamedType( - TDigestMergeSketchAggregatorFactory.class, - TDigestMergeSketchAggregatorFactory.TYPE_NAME + TDigestSketchAggregatorFactory.class, + TDigestSketchAggregatorFactory.TYPE_NAME ), new NamedType( TDigestSketchToQuantilesPostAggregator.class, TDigestSketchToQuantilesPostAggregator.TYPE_NAME + ), + new NamedType( + TDigestSketchToQuantilePostAggregator.class, + TDigestSketchToQuantilePostAggregator.TYPE_NAME ) ).addSerializer(MergingDigest.class, new TDigestSketchJsonSerializer()) ); @@ -63,11 +66,14 @@ public List getJacksonModules() public void configure(Binder binder) { registerSerde(); + SqlBindings.addAggregator(binder, TDigestSketchQuantileSqlAggregator.class); + SqlBindings.addAggregator(binder, TDigestGenerateSketchSqlAggregator.class); } @VisibleForTesting - static void registerSerde() + public static void registerSerde() { - ComplexMetrics.registerSerde(TDigestBuildSketchAggregatorFactory.TYPE_NAME, new TDigestSketchComplexMetricSerde()); + ComplexMetrics.registerSerde(TDigestSketchAggregatorFactory.TYPE_NAME, new TDigestSketchComplexMetricSerde()); + ComplexMetrics.registerSerde("TDIGEST_GENERATE_SKETCH", new TDigestSketchComplexMetricSerde()); } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java index 765e707ea387..706e3e1ac866 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java @@ -58,6 +58,6 @@ public byte[] toBytes(@Nullable MergingDigest val) @Override public int compare(MergingDigest o1, MergingDigest o2) { - return TDigestBuildSketchAggregatorFactory.COMPARATOR.compare(o1, o2); + return TDigestSketchAggregatorFactory.COMPARATOR.compare(o1, o2); } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java new file mode 100644 index 000000000000..788369c09515 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.tdunning.math.stats.MergingDigest; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Post aggregation operator that can take in aggregated T-Digest sketches and + * generate quantiles from it. + */ +public class TDigestSketchToQuantilePostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + + private final double fraction; + + public static final String TYPE_NAME = "quantileFromTDigestSketch"; + + @JsonCreator + public TDigestSketchToQuantilePostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("fractions") final double fraction + ) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.fraction = fraction; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double getFraction() + { + return fraction; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final MergingDigest sketch = (MergingDigest) field.compute(combinedAggregators); + return sketch.quantile(fraction); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + ", fraction=" + fraction + + "}"; + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); + builder.appendDouble(fraction); + return builder.build(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TDigestSketchToQuantilePostAggregator that = (TDigestSketchToQuantilePostAggregator) o; + return Double.compare(that.fraction, fraction) == 0 && + Objects.equals(name, that.name) && + Objects.equals(field, that.field); + } + + @Override + public int hashCode() + { + return Objects.hash(name, field, fraction); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } + +} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java index 485a397dd372..fc994dc10f43 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java @@ -57,7 +57,7 @@ public TDigestSketchToQuantilesPostAggregator( this.name = Preconditions.checkNotNull(name, "name is null"); this.field = Preconditions.checkNotNull(field, "field is null"); this.fractions = Preconditions.checkNotNull(fractions, "array of fractions is null"); - Preconditions.checkArgument(this.fractions.length > 1, "Array of fractions cannot be empty"); + Preconditions.checkArgument(this.fractions.length >= 1, "Array of fractions cannot be empty"); } @Override diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java new file mode 100644 index 000000000000..61bd7c7fd876 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class TDigestGenerateSketchSqlAggregator implements SqlAggregator +{ + private static final SqlAggFunction FUNCTION_INSTANCE = new TDigestGenerateSketchSqlAggregator.TDigestGenerateSketchSqlAggFunction(); + private static final String NAME = "TDIGEST_GENERATE_SKETCH"; + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + final RexNode inputOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ); + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + inputOperand + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String aggName = StringUtils.format("%s:agg", name); + final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + + if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) { + // maxNumEntriesOperand must be a literal in order to plan. + return null; + } + + final Integer compression = ((Number) RexLiteral.value(maxNumEntriesOperand)).intValue(); + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof TDigestSketchAggregatorFactory) { + final TDigestSketchAggregatorFactory theFactory = (TDigestSketchAggregatorFactory) factory; + + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = existing.getVirtualColumns() + .stream() + .filter( + virtualColumn -> + virtualColumn.getOutputName() + .equals(theFactory.getFieldName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + inputMatches = input.isDirectColumnAccess() + && input.getDirectColumn().equals(theFactory.getFieldName()); + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() + .equals(input.getExpression()); + } + final boolean matches = inputMatches && theFactory.getCompression() == compression; + + if (matches) { + // Found existing one. Use this. + return Aggregation.create( + theFactory + ); + } + } + } + } + + // No existing match found. Create a new one. + final List virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + aggregatorFactory = new TDigestSketchAggregatorFactory( + aggName, + input.getDirectColumn(), + compression + ); + } else { + VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + plannerContext, + input, + SqlTypeName.FLOAT + ); + virtualColumns.add(virtualColumn); + aggregatorFactory = new TDigestSketchAggregatorFactory( + aggName, + virtualColumn.getOutputName(), + compression + ); + } + + return Aggregation.create( + virtualColumns, + aggregatorFactory + ); + } + + private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction + { + //private static final String SIGNATURE1 = "'" + NAME + "(column)'\n"; + private static final String SIGNATURE2 = "'" + NAME + "(column, compression)'\n"; + + TDigestGenerateSketchSqlAggFunction() + { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.OTHER), + null, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java new file mode 100644 index 000000000000..a80d555bc0c0 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class TDigestSketchQuantileSqlAggregator implements SqlAggregator +{ + private static final SqlAggFunction FUNCTION_INSTANCE = new TDigestSketchQuantileSqlAggFunction(); + private static final String NAME = "TDIGEST_QUANTILE"; + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + // This is expected to be a tdigest sketch + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ) + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String histogramName = StringUtils.format("%s:agg", name); + + // this is expected to be quantile fraction + final RexNode quantileArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + + if (!quantileArg.isA(SqlKind.LITERAL)) { + // Quantile must be a literal in order to plan. + return null; + } + + final double quantile = ((Number) RexLiteral.value(quantileArg)).floatValue(); + Integer compression = null; + if (aggregateCall.getArgList().size() > 2) { + final RexNode compressionArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(2) + ); + compression = ((Number) RexLiteral.value(compressionArg)).intValue(); + } + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof TDigestSketchAggregatorFactory) { + final TDigestSketchAggregatorFactory theFactory = (TDigestSketchAggregatorFactory) factory; + + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = existing.getVirtualColumns() + .stream() + .filter( + virtualColumn -> + virtualColumn.getOutputName() + .equals(theFactory.getFieldName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + inputMatches = input.isDirectColumnAccess() + && input.getDirectColumn().equals(theFactory.getFieldName()); + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() + .equals(input.getExpression()); + } + + if (inputMatches) { + // Found existing one. Use this. + return Aggregation.create( + ImmutableList.of(), + new TDigestSketchToQuantilePostAggregator( + name, + new FieldAccessPostAggregator( + factory.getName(), + factory.getName() + ), + quantile + ) + ); + } + } + } + } + + // No existing match found. Create a new one. + final List virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + aggregatorFactory = new TDigestSketchAggregatorFactory( + histogramName, + input.getDirectColumn(), + compression + ); + } else { + VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + plannerContext, + input, + SqlTypeName.FLOAT + ); + virtualColumns.add(virtualColumn); + aggregatorFactory = new TDigestSketchAggregatorFactory( + histogramName, + virtualColumn.getOutputName(), + compression + ); + } + + return Aggregation.create( + virtualColumns, + ImmutableList.of(aggregatorFactory), + new TDigestSketchToQuantilePostAggregator( + name, + new FieldAccessPostAggregator( + histogramName, + histogramName + ), + quantile + ) + ); + } + + private static class TDigestSketchQuantileSqlAggFunction extends SqlAggFunction + { + private static final String SIGNATURE1 = "'" + NAME + "(column, quantile)'\n"; + private static final String SIGNATURE2 = "'" + NAME + "(column, quantile, compression)'\n"; + + TDigestSketchQuantileSqlAggFunction() + { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.DOUBLE), + null, + OperandTypes.or( + OperandTypes.and( + OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) + ) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java index 67773b165fdb..ed621ae30a7f 100644 --- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java @@ -73,23 +73,7 @@ public void serializeDeserializeFactoryWithFieldName() throws Exception { ObjectMapper objectMapper = new DefaultObjectMapper(); new TDigestSketchModule().getJacksonModules().forEach(objectMapper::registerModule); - TDigestBuildSketchAggregatorFactory factory = new TDigestBuildSketchAggregatorFactory("name", "filedName", 128); - - AggregatorFactory other = objectMapper.readValue( - objectMapper.writeValueAsString(factory), - AggregatorFactory.class - ); - - Assert.assertEquals(factory, other); - } - - // this is to test Json properties and equals for the combining factory - @Test - public void serializeDeserializeCombiningFactoryWithFieldName() throws Exception - { - ObjectMapper objectMapper = new DefaultObjectMapper(); - new TDigestSketchModule().getJacksonModules().forEach(objectMapper::registerModule); - TDigestMergeSketchAggregatorFactory factory = new TDigestMergeSketchAggregatorFactory("name", "fieldName", 128); + TDigestSketchAggregatorFactory factory = new TDigestSketchAggregatorFactory("name", "filedName", 128); AggregatorFactory other = objectMapper.readValue( objectMapper.writeValueAsString(factory), @@ -132,7 +116,7 @@ public void buildingSketchesAtIngestionTime() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"buildTDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", " ],", @@ -237,7 +221,7 @@ public void testIngestingSketches() throws Exception String.join( "\n", "[", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"buildTDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", "]" @@ -253,7 +237,7 @@ public void testIngestingSketches() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + " {\"type\": \"buildTDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + "\"first_level_merge_sketch\", \"compression\": " + "200}", " ],", diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java new file mode 100644 index 000000000000..777e79496836 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchModule; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.DruidOperatorTable; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.PlannerFactory; +import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.SystemSchema; +import org.apache.druid.sql.calcite.util.CalciteTestBase; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.QueryLogHook; +import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class TDigestSketchSqlAggregatorTest extends CalciteTestBase +{ + private static final String DATA_SOURCE = "foo"; + + private static QueryRunnerFactoryConglomerate conglomerate; + private static Closer resourceCloser; + private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy" + ); + + @BeforeClass + public static void setUpClass() + { + final Pair conglomerateCloserPair = CalciteTests + .createQueryRunnerFactoryConglomerate(); + conglomerate = conglomerateCloserPair.lhs; + resourceCloser = conglomerateCloserPair.rhs; + } + + @AfterClass + public static void tearDownClass() throws IOException + { + resourceCloser.close(); + } + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public QueryLogHook queryLogHook = QueryLogHook.create(); + + private SpecificSegmentsQuerySegmentWalker walker; + private SqlLifecycleFactory sqlLifecycleFactory; + + @Before + public void setUp() throws Exception + { + TDigestSketchModule.registerSerde(); + for (Module mod : new TDigestSketchModule().getJacksonModules()) { + CalciteTests.getJsonMapper().registerModule(mod); + } + + final QueryableIndex index = + IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new TDigestSketchAggregatorFactory( + "qsketch_m1", + "m1", + 128 + ) + ) + .withRollup(false) + .build() + ) + .rows(CalciteTests.ROWS1) + .buildMMappedIndex(); + + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(index.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index + ); + + final PlannerConfig plannerConfig = new PlannerConfig(); + final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); + final DruidOperatorTable operatorTable = new DruidOperatorTable( + ImmutableSet.of(new TDigestSketchQuantileSqlAggregator(), new TDigestGenerateSketchSqlAggregator()), + ImmutableSet.of() + ); + sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory( + new PlannerFactory( + druidSchema, + systemSchema, + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + operatorTable, + CalciteTests.createExprMacroTable(), + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.getJsonMapper() + ) + ); + } + + @After + public void tearDown() throws Exception + { + walker.close(); + walker = null; + } + + @Test + public void testComputingSketchOnNumericValues() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_GENERATE_SKETCH(m1, 200)" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new String[] { + "\"AAAAAT/wAAAAAAAAQBgAAAAAAABAaQAAAAAAAAAAAAY/8AAAAAAAAD/wAAAAAAAAP/AAAAAAAABAAAAAAAAAAD/wAAAAAAAAQAgAAAAAAAA/8AAAAAAAAEAQAAAAAAAAP/AAAAAAAABAFAAAAAAAAD/wAAAAAAAAQBgAAAAAAAA=\"" + } + ); + + Assert.assertEquals(expectedResults.size(), results.size()); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", 200) + )) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testComputingQuantileOnPreAggregatedSketch() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.1),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.4),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.8),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 1.0)\n" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.1, + 2.9, + 5.3, + 6.0 + } + ); + + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "qsketch_m1", 100) + )) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.1f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a0:agg"), 0.4f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a0:agg"), 0.8f), + new TDigestSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testGeneratingSketchAndComputingQuantileOnFly() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT TDIGEST_QUANTILE(x, 0.0), TDIGEST_QUANTILE(x, 0.5), TDIGEST_QUANTILE(x, 1.0)\n" + + "FROM (SELECT dim1, TDIGEST_GENERATE_SKETCH(m1, 200) AS x FROM foo group by dim1)"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.0, + 3.5, + 6.0 + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("dim1", "d0")) + .setAggregatorSpecs( + ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", 200) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build() + ) + ) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs( + ImmutableList.of( + new TDigestSketchAggregatorFactory("_a0:agg", "a0:agg", 100) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new TDigestSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("_a1", makeFieldAccessPostAgg("_a0:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("_a2", makeFieldAccessPostAgg("_a0:agg"), 1.0f) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testQuantileOnNumericValues() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(m1, 0.0), TDIGEST_QUANTILE(m1, 0.5), TDIGEST_QUANTILE(m1, 1.0)\n" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.0, + 3.5, + 6.0 + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", null) + )) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a0:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a0:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + + private static PostAggregator makeFieldAccessPostAgg(String name) + { + return new FieldAccessPostAggregator(name, name); + } +} From 4184db0bf9a52374cde7922c4af87328996fccb7 Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Thu, 25 Jul 2019 23:19:16 -0700 Subject: [PATCH 2/6] Fix teamcity errors --- .../aggregation/tdigestsketch/TDigestSketchAggregator.java | 2 -- .../tdigestsketch/TDigestSketchToQuantilePostAggregator.java | 2 +- .../java/org/apache/druid/query/aggregation/AggregatorUtil.java | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java index 730fdef90ac9..9197923a800a 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java @@ -30,8 +30,6 @@ /** * Aggregator to build T-Digest sketches on numeric values. * It generally makes sense to use this aggregator during the ingestion time. - * Then during query time one can use {@link TDigestMergeSketchAggregator} to merge - * these pre-aggregated sketches. *

* One can use this aggregator to build these sketches during query time too, just * that it will be slower and more resource intensive. diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java index 788369c09515..e5976df271d4 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java @@ -112,7 +112,7 @@ public String toString() public byte[] getCacheKey() { final CacheKeyBuilder builder = new CacheKeyBuilder( - AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); + AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID).appendCacheable(field); builder.appendDouble(fraction); return builder.build(); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index e5e5f51d3a8b..91f1d4284956 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -121,8 +121,8 @@ public class AggregatorUtil // TDigest sketch aggregators public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38; - public static final byte TDIGEST_MERGE_SKETCH_CACHE_TYPE_ID = 0x39; public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 0x40; + public static final byte TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 0x41; /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg From bd643b9589f1a2b4816e57e586ca4016e628ce7b Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Sat, 27 Jul 2019 14:59:19 -0700 Subject: [PATCH 3/6] Add missing dependencies --- extensions-contrib/tdigestsketch/pom.xml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/tdigestsketch/pom.xml b/extensions-contrib/tdigestsketch/pom.xml index 32928290621a..d1a2e437b6ed 100644 --- a/extensions-contrib/tdigestsketch/pom.xml +++ b/extensions-contrib/tdigestsketch/pom.xml @@ -74,6 +74,17 @@ ${project.parent.version} provided + + org.apache.calcite + calcite-core + provided + + + org.apache.druid + druid-server + provided + ${project.parent.version} + @@ -117,4 +128,4 @@ - \ No newline at end of file + From 4b9caac062b25463edd18fd056772faec50dac4f Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Sun, 28 Jul 2019 13:17:50 -0700 Subject: [PATCH 4/6] Remove unused dependency --- extensions-contrib/tdigestsketch/pom.xml | 45 ++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/extensions-contrib/tdigestsketch/pom.xml b/extensions-contrib/tdigestsketch/pom.xml index d1a2e437b6ed..8b275f5e919f 100644 --- a/extensions-contrib/tdigestsketch/pom.xml +++ b/extensions-contrib/tdigestsketch/pom.xml @@ -68,6 +68,51 @@ guice provided + + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + provided + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + provided + + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + provided + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + provided + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-smile-provider + provided + + + it.unimi.dsi + fastutil + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.google.errorprone + error_prone_annotations + provided + org.apache.druid druid-sql From c54bb5a4384ce92823f6e8a096a18718f706394a Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Tue, 30 Jul 2019 13:30:50 -0700 Subject: [PATCH 5/6] Address code review comments --- .../tdigestsketch-quantiles.md | 27 +++++++++---------- .../TDigestSketchAggregatorFactory.java | 2 +- .../TDigestSketchAggregatorTest.java | 10 +++---- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md index 94b07e90c4b6..9d73d04b8f08 100644 --- a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md +++ b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md @@ -29,13 +29,12 @@ T-Digest (https://github.com/tdunning/t-digest) is a popular datastructure for a rank-based statistics such as quantiles and trimmed means. The datastructure is also designed for parallel programming use cases like distributed aggregations or map reduce jobs by making combining two intermediate t-digests easy and efficient. -There are two flavors of T-Digest sketch aggregator available in Apache Druid (incubating): - -1. buildTDigestSketch - It generally makes sense to use this aggregator when ingesting raw data into Druid to -generate pre-agrgegated sketches. One can also use this aggregator during query time too to combine ingested T-Digest -sketches. -2. quantilesFromTDigestSketch - used for generating quantiles from T-Digest sketches. This aggregator is generally used -during query time to generate quantiles from sketches built using buildTDigestSketch. +The tDigestSketch aggregator is capable of generating sketches from raw numeric values as well as +aggregating/combining pre-generated T-Digest sketches generated using the tDigestSketch aggregator itself. +While one can generate sketches on the fly during the query time itself, it generally is more performant +to generate sketches during ingestion time itself and then combining them during query time. +The module also provides a postAggregator, quantilesFromTDigestSketch, that can be used to compute approximate +quantiles from T-Digest sketches generated by the tDigestSketch aggreator. To use this aggregator, make sure you [include](../../operations/including-extensions.html) the extension in your config file: @@ -50,7 +49,7 @@ combining pre-generated T-Digest sketches. ```json { - "type" : "buildTDigestSketch", + "type" : "tDigestSketch", "name" : , "fieldName" : , "compression": @@ -61,7 +60,7 @@ Example: ```json { - "type": "buildTDigestSketch", + "type": "tDigestSketch", "name": "sketch", "fieldName": "session_duration", "compression": 200 @@ -70,16 +69,16 @@ Example: ```json { - "type": "buildTDigestSketch", + "type": "tDigestSketch", "name": "combined_sketch", - "fieldName": "ingested_sketch", + "fieldName": , "compression": 200 } ``` |property|description|required?| |--------|-----------|---------| -|type|This String should always be "buildTDigestSketch"|yes| +|type|This String should always be "tDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| |fieldName|A String for the name of the input field containing raw numeric values or pre-generated T-Digest sketches.|yes| |compression|Parameter that determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches.|no, defaults to 100| @@ -116,7 +115,7 @@ Example: "granularity": "ALL", "dimensions": [], "aggregations": [{ - "type": "buildTDigestSketch", + "type": "tDigestSketch", "name": "merged_sketch", "fieldName": "ingested_sketch", "compression": 200 @@ -147,7 +146,7 @@ Similar to quantilesFromTDigestSketch except it takes in a single fraction for c |property|description|required?| |--------|-----------|---------| -|type|This String should always be "quantilesFromTDigestSketch"|yes| +|type|This String should always be "quantileFromTDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| |field|A field reference pointing to the field aggregated/combined T-Digest sketch.|yes| |fraction|Decimal value between 0 and 1|yes| diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java index 24cf7283fa6a..b9d34f377d7e 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java @@ -70,7 +70,7 @@ public class TDigestSketchAggregatorFactory extends AggregatorFactory @Nonnull private final byte cacheTypeId; - public static final String TYPE_NAME = "buildTDigestSketch"; + public static final String TYPE_NAME = "tDigestSketch"; @JsonCreator public TDigestSketchAggregatorFactory( diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java index ed621ae30a7f..e531407cdb04 100644 --- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java @@ -104,7 +104,7 @@ public void buildingSketchesAtIngestionTime() throws Exception " }", "}" ), - "[{\"type\": \"buildTDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}]", + "[{\"type\": \"tDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}]", 0, // minTimestamp Granularities.NONE, 10, // maxRowCount @@ -116,7 +116,7 @@ public void buildingSketchesAtIngestionTime() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"buildTDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"tDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", " ],", @@ -174,7 +174,7 @@ public void buildingSketchesAtQueryTime() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"buildTDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}", + " {\"type\": \"tDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}", " ],", " \"postAggregations\": [", " {\"type\": \"quantilesFromTDigestSketch\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", @@ -221,7 +221,7 @@ public void testIngestingSketches() throws Exception String.join( "\n", "[", - " {\"type\": \"buildTDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"tDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", "]" @@ -237,7 +237,7 @@ public void testIngestingSketches() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"buildTDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + " {\"type\": \"tDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + "\"first_level_merge_sketch\", \"compression\": " + "200}", " ],", From d0d90438230a6527b87b4fac21437404c9d0c979 Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Fri, 2 Aug 2019 11:19:50 -0700 Subject: [PATCH 6/6] Add checks for compression param --- docs/content/querying/sql.md | 2 +- ...TDigestSketchToQuantilePostAggregator.java | 6 +- ...DigestSketchToQuantilesPostAggregator.java | 4 +- .../tdigestsketch/TDigestSketchUtils.java | 33 ++++++++++ .../TDigestGenerateSketchSqlAggregator.java | 63 ++++++++---------- .../TDigestSketchQuantileSqlAggregator.java | 45 +++++-------- .../sql/TDigestSketchSqlAggregatorTest.java | 64 +++++++++++++++++++ .../query/aggregation/AggregatorUtil.java | 2 - .../aggregation/post/PostAggregatorIds.java | 2 + 9 files changed, 146 insertions(+), 75 deletions(-) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index ef43f1af9593..21d6edf4f5e8 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -184,7 +184,7 @@ Only the COUNT aggregation can accept DISTINCT. |`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.html) exprs. The "probability" should be between 0 and 1 (exclusive). The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.| |`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric or [fixed buckets histogram](../development/extensions-core/approximate-histograms.html#fixed-buckets-histogram) exprs. The "probability" should be between 0 and 1 (exclusive). The `numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters are described in the fixed buckets histogram documentation. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.| |`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positive rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.| -|`TDIGEST_QUANTILE(expr, quantileFraction)`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile .See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| +|`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile. Compression parameter (default value 100) determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| |`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on values produced by `expr`. Compression parameter (default value 100) determines the accuracy and size of the sketch Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| |`VAR_POP(expr)`|Computes variance population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`VAR_SAMP(expr)`|Computes variance sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java index e5976df271d4..d54b90b4ac89 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java @@ -25,8 +25,8 @@ import com.tdunning.math.stats.MergingDigest; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Comparator; @@ -52,7 +52,7 @@ public class TDigestSketchToQuantilePostAggregator implements PostAggregator public TDigestSketchToQuantilePostAggregator( @JsonProperty("name") final String name, @JsonProperty("field") final PostAggregator field, - @JsonProperty("fractions") final double fraction + @JsonProperty("fraction") final double fraction ) { this.name = Preconditions.checkNotNull(name, "name is null"); @@ -112,7 +112,7 @@ public String toString() public byte[] getCacheKey() { final CacheKeyBuilder builder = new CacheKeyBuilder( - AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID).appendCacheable(field); + PostAggregatorIds.TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID).appendCacheable(field); builder.appendDouble(fraction); return builder.build(); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java index fc994dc10f43..245f648cc3cf 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java @@ -25,8 +25,8 @@ import com.tdunning.math.stats.MergingDigest; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Arrays; @@ -143,7 +143,7 @@ public int hashCode() public byte[] getCacheKey() { final CacheKeyBuilder builder = new CacheKeyBuilder( - AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); + PostAggregatorIds.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); for (final double value : fractions) { builder.appendDouble(value); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java index 0a5d9a18098b..edfd148b4063 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java @@ -23,6 +23,10 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.expression.DruidExpression; import java.nio.ByteBuffer; @@ -87,4 +91,33 @@ static void throwExceptionForWrongType(ColumnValueSelector selector) ); throw new IAE(msg); } + + public static boolean matchingAggregatorFactoryExists( + final DruidExpression input, + final Integer compression, + final Aggregation existing, + final TDigestSketchAggregatorFactory factory + ) + { + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = existing.getVirtualColumns() + .stream() + .filter( + virtualColumn -> + virtualColumn.getOutputName() + .equals(factory.getFieldName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + inputMatches = input.isDirectColumnAccess() + && input.getDirectColumn().equals(factory.getFieldName()); + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() + .equals(input.getExpression()); + } + return inputMatches && compression == factory.getCompression(); + } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java index 61bd7c7fd876..725f335baee6 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java @@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils; import org.apache.druid.segment.VirtualColumn; -import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.DruidExpression; @@ -89,45 +89,32 @@ public Aggregation toDruidAggregation( final AggregatorFactory aggregatorFactory; final String aggName = StringUtils.format("%s:agg", name); - final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess( - rowSignature, - project, - aggregateCall.getArgList().get(1) - ); - if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) { - // maxNumEntriesOperand must be a literal in order to plan. - return null; + Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; + if (aggregateCall.getArgList().size() > 1) { + RexNode compressionOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + if (!compressionOperand.isA(SqlKind.LITERAL)) { + // compressionOperand must be a literal in order to plan. + return null; + } + compression = ((Number) RexLiteral.value(compressionOperand)).intValue(); } - final Integer compression = ((Number) RexLiteral.value(maxNumEntriesOperand)).intValue(); - // Look for existing matching aggregatorFactory. for (final Aggregation existing : existingAggregations) { for (AggregatorFactory factory : existing.getAggregatorFactories()) { if (factory instanceof TDigestSketchAggregatorFactory) { final TDigestSketchAggregatorFactory theFactory = (TDigestSketchAggregatorFactory) factory; - - // Check input for equivalence. - final boolean inputMatches; - final VirtualColumn virtualInput = existing.getVirtualColumns() - .stream() - .filter( - virtualColumn -> - virtualColumn.getOutputName() - .equals(theFactory.getFieldName()) - ) - .findFirst() - .orElse(null); - - if (virtualInput == null) { - inputMatches = input.isDirectColumnAccess() - && input.getDirectColumn().equals(theFactory.getFieldName()); - } else { - inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() - .equals(input.getExpression()); - } - final boolean matches = inputMatches && theFactory.getCompression() == compression; + final boolean matches = TDigestSketchUtils.matchingAggregatorFactoryExists( + input, + compression, + existing, + (TDigestSketchAggregatorFactory) factory + ); if (matches) { // Found existing one. Use this. @@ -170,8 +157,7 @@ public Aggregation toDruidAggregation( private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction { - //private static final String SIGNATURE1 = "'" + NAME + "(column)'\n"; - private static final String SIGNATURE2 = "'" + NAME + "(column, compression)'\n"; + private static final String SIGNATURE_WITH_COMPRESSION = "'" + NAME + "(column, compression)'\n"; TDigestGenerateSketchSqlAggFunction() { @@ -181,9 +167,12 @@ private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction SqlKind.OTHER_FUNCTION, ReturnTypes.explicit(SqlTypeName.OTHER), null, - OperandTypes.and( - OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL), - OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + OperandTypes.or( + OperandTypes.ANY, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE_WITH_COMPRESSION, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ) ), SqlFunctionCategory.USER_DEFINED_FUNCTION, false, diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java index a80d555bc0c0..022317227028 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java @@ -37,8 +37,8 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils; import org.apache.druid.segment.VirtualColumn; -import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.DruidExpression; @@ -91,7 +91,7 @@ public Aggregation toDruidAggregation( } final AggregatorFactory aggregatorFactory; - final String histogramName = StringUtils.format("%s:agg", name); + final String sketchName = StringUtils.format("%s:agg", name); // this is expected to be quantile fraction final RexNode quantileArg = Expressions.fromFieldAccess( @@ -106,7 +106,7 @@ public Aggregation toDruidAggregation( } final double quantile = ((Number) RexLiteral.value(quantileArg)).floatValue(); - Integer compression = null; + Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; if (aggregateCall.getArgList().size() > 2) { final RexNode compressionArg = Expressions.fromFieldAccess( rowSignature, @@ -120,29 +120,14 @@ public Aggregation toDruidAggregation( for (final Aggregation existing : existingAggregations) { for (AggregatorFactory factory : existing.getAggregatorFactories()) { if (factory instanceof TDigestSketchAggregatorFactory) { - final TDigestSketchAggregatorFactory theFactory = (TDigestSketchAggregatorFactory) factory; - - // Check input for equivalence. - final boolean inputMatches; - final VirtualColumn virtualInput = existing.getVirtualColumns() - .stream() - .filter( - virtualColumn -> - virtualColumn.getOutputName() - .equals(theFactory.getFieldName()) - ) - .findFirst() - .orElse(null); - - if (virtualInput == null) { - inputMatches = input.isDirectColumnAccess() - && input.getDirectColumn().equals(theFactory.getFieldName()); - } else { - inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() - .equals(input.getExpression()); - } - - if (inputMatches) { + final boolean matches = TDigestSketchUtils.matchingAggregatorFactoryExists( + input, + compression, + existing, + (TDigestSketchAggregatorFactory) factory + ); + + if (matches) { // Found existing one. Use this. return Aggregation.create( ImmutableList.of(), @@ -165,7 +150,7 @@ public Aggregation toDruidAggregation( if (input.isDirectColumnAccess()) { aggregatorFactory = new TDigestSketchAggregatorFactory( - histogramName, + sketchName, input.getDirectColumn(), compression ); @@ -177,7 +162,7 @@ public Aggregation toDruidAggregation( ); virtualColumns.add(virtualColumn); aggregatorFactory = new TDigestSketchAggregatorFactory( - histogramName, + sketchName, virtualColumn.getOutputName(), compression ); @@ -189,8 +174,8 @@ public Aggregation toDruidAggregation( new TDigestSketchToQuantilePostAggregator( name, new FieldAccessPostAggregator( - histogramName, - histogramName + sketchName, + sketchName ), quantile ) diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java index 777e79496836..ab575c3b183e 100644 --- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java @@ -210,6 +210,32 @@ public void testComputingSketchOnNumericValues() throws Exception ); } + @Test + public void testDefaultCompressionForTDigestGenerateSketchAgg() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_GENERATE_SKETCH(m1)" + + "FROM foo"; + + // Log query + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION) + )) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + @Test public void testComputingQuantileOnPreAggregatedSketch() throws Exception { @@ -375,6 +401,44 @@ public void testQuantileOnNumericValues() throws Exception ); } + @Test + public void testCompressionParamForTDigestQuantileAgg() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(m1, 0.0), TDIGEST_QUANTILE(m1, 0.5, 200), TDIGEST_QUANTILE(m1, 1.0, 300)\n" + + "FROM foo"; + + // Log query + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", + TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION + ), + new TDigestSketchAggregatorFactory("a1:agg", "m1", + 200 + ), + new TDigestSketchAggregatorFactory("a2:agg", "m1", + 300 + ))) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + private static PostAggregator makeFieldAccessPostAgg(String name) { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index 91f1d4284956..ca0eb3ae22b1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -121,8 +121,6 @@ public class AggregatorUtil // TDigest sketch aggregators public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38; - public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 0x40; - public static final byte TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 0x41; /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java index 52d693d6cc1b..4d9f9f5059c3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java @@ -51,4 +51,6 @@ public class PostAggregatorIds public static final byte QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID = 27; public static final byte QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID = 28; public static final byte THETA_SKETCH_TO_STRING = 29; + public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 30; + public static final byte TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 31; }