diff --git a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md index 3fc07cf29820..9d73d04b8f08 100644 --- a/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md +++ b/docs/content/development/extensions-contrib/tdigestsketch-quantiles.md @@ -29,16 +29,12 @@ T-Digest (https://github.com/tdunning/t-digest) is a popular datastructure for a rank-based statistics such as quantiles and trimmed means. The datastructure is also designed for parallel programming use cases like distributed aggregations or map reduce jobs by making combining two intermediate t-digests easy and efficient. -There are three flavors of T-Digest sketch aggregator available in Apache Druid (incubating): - -1. buildTDigestSketch - used for building T-Digest sketches from raw numeric values. It generally makes sense to -use this aggregator when ingesting raw data into Druid. One can also use this aggregator during query time too to -generate sketches, just that one would be building these sketches on every query execution instead of building them -once during ingestion. -2. mergeTDigestSketch - used for merging pre-built T-Digest sketches. This aggregator is generally used during -query time to combine sketches generated by buildTDigestSketch aggregator. -3. quantilesFromTDigestSketch - used for generating quantiles from T-Digest sketches. This aggregator is generally used -during query time to generate quantiles from sketches built using the above two sketch generating aggregators. +The tDigestSketch aggregator is capable of generating sketches from raw numeric values as well as +aggregating/combining pre-generated T-Digest sketches generated using the tDigestSketch aggregator itself. +While one can generate sketches on the fly during the query time itself, it generally is more performant +to generate sketches during ingestion time itself and then combining them during query time. +The module also provides a postAggregator, quantilesFromTDigestSketch, that can be used to compute approximate +quantiles from T-Digest sketches generated by the tDigestSketch aggreator. To use this aggregator, make sure you [include](../../operations/including-extensions.html) the extension in your config file: @@ -48,11 +44,12 @@ druid.extensions.loadList=["druid-tdigestsketch"] ### Aggregator -The result of the aggregation is a T-Digest sketch that is built ingesting numeric values from the raw data. +The result of the aggregation is a T-Digest sketch that is built ingesting numeric values from the raw data or from +combining pre-generated T-Digest sketches. ```json { - "type" : "buildTDigestSketch", + "type" : "tDigestSketch", "name" : , "fieldName" : , "compression": @@ -63,56 +60,29 @@ Example: ```json { - "type": "buildTDigestSketch", + "type": "tDigestSketch", "name": "sketch", "fieldName": "session_duration", "compression": 200 } ``` -|property|description|required?| -|--------|-----------|---------| -|type|This String should always be "buildTDigestSketch"|yes| -|name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| -|compression|Parameter that determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches.|no, defaults to 100| - - -The result of the aggregation is a T-Digest sketch that is built by merging pre-built T-Digest sketches. - ```json { - "type" : "mergeTDigestSketch", - "name" : , - "fieldName" : , - "compression": - } + "type": "tDigestSketch", + "name": "combined_sketch", + "fieldName": , + "compression": 200 +} ``` |property|description|required?| |--------|-----------|---------| -|type|This String should always be "mergeTDigestSketch"|yes| +|type|This String should always be "tDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| +|fieldName|A String for the name of the input field containing raw numeric values or pre-generated T-Digest sketches.|yes| |compression|Parameter that determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches.|no, defaults to 100| -Example: - -```json -{ - "queryType": "groupBy", - "dataSource": "test_datasource", - "granularity": "ALL", - "dimensions": [], - "aggregations": [{ - "type": "mergeTDigestSketch", - "name": "merged_sketch", - "fieldName": "ingested_sketch", - "compression": 200 - }], - "intervals": ["2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z"] -} -``` ### Post Aggregators @@ -133,7 +103,7 @@ This returns an array of quantiles corresponding to a given array of fractions. |--------|-----------|---------| |type|This String should always be "quantilesFromTDigestSketch"|yes| |name|A String for the output (result) name of the calculation.|yes| -|fieldName|A String for the name of the input field containing raw numeric values.|yes| +|field|A field reference pointing to the field aggregated/combined T-Digest sketch.|yes| |fractions|Non-empty array of fractions between 0 and 1|yes| Example: @@ -145,7 +115,7 @@ Example: "granularity": "ALL", "dimensions": [], "aggregations": [{ - "type": "mergeTDigestSketch", + "type": "tDigestSketch", "name": "merged_sketch", "fieldName": "ingested_sketch", "compression": 200 @@ -162,3 +132,21 @@ Example: "intervals": ["2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z"] } ``` + +Similar to quantilesFromTDigestSketch except it takes in a single fraction for computing quantile. + +```json +{ + "type" : "quantileFromTDigestSketch", + "name": , + "field" : , + "fraction" : +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This String should always be "quantileFromTDigestSketch"|yes| +|name|A String for the output (result) name of the calculation.|yes| +|field|A field reference pointing to the field aggregated/combined T-Digest sketch.|yes| +|fraction|Decimal value between 0 and 1|yes| diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 73c5e2d554dc..21d6edf4f5e8 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -184,6 +184,8 @@ Only the COUNT aggregation can accept DISTINCT. |`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.html) exprs. The "probability" should be between 0 and 1 (exclusive). The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.| |`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric or [fixed buckets histogram](../development/extensions-core/approximate-histograms.html#fixed-buckets-histogram) exprs. The "probability" should be between 0 and 1 (exclusive). The `numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters are described in the fixed buckets histogram documentation. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.| |`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positive rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.| +|`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile. Compression parameter (default value 100) determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| +|`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on values produced by `expr`. Compression parameter (default value 100) determines the accuracy and size of the sketch Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.| |`VAR_POP(expr)`|Computes variance population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`VAR_SAMP(expr)`|Computes variance sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`VARIANCE(expr)`|Computes variance sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| diff --git a/extensions-contrib/tdigestsketch/pom.xml b/extensions-contrib/tdigestsketch/pom.xml index e6f25a70a4f7..8b275f5e919f 100644 --- a/extensions-contrib/tdigestsketch/pom.xml +++ b/extensions-contrib/tdigestsketch/pom.xml @@ -58,6 +58,80 @@ ${project.parent.version} provided + + com.google.code.findbugs + jsr305 + provided + + + com.google.inject + guice + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + provided + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + provided + + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + provided + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + provided + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-smile-provider + provided + + + it.unimi.dsi + fastutil + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.google.errorprone + error_prone_annotations + provided + + + org.apache.druid + druid-sql + ${project.parent.version} + provided + + + org.apache.calcite + calcite-core + provided + + + org.apache.druid + druid-server + provided + ${project.parent.version} + + + junit junit @@ -86,9 +160,17 @@ org.apache.druid druid-server ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-sql + ${project.parent.version} + test-jar test - \ No newline at end of file + diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java deleted file mode 100644 index 0f6002344db9..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregator.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.tdunning.math.stats.MergingDigest; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.segment.ColumnValueSelector; - - -/** - * Aggregator that merges T-Digest based sketches generated from {@link TDigestBuildSketchAggregator} - */ -public class TDigestMergeSketchAggregator implements Aggregator -{ - private final ColumnValueSelector selector; - - @GuardedBy("this") - private MergingDigest tdigestSketch; - - public TDigestMergeSketchAggregator( - ColumnValueSelector selector, - final Integer compression - ) - { - this.selector = selector; - this.tdigestSketch = new MergingDigest(compression); - } - - @Override - public void aggregate() - { - final MergingDigest sketch = selector.getObject(); - if (sketch == null) { - return; - } - synchronized (this) { - this.tdigestSketch.add(sketch); - } - } - - @Override - public synchronized Object get() - { - return tdigestSketch; - } - - @Override - public float getFloat() - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong() - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public synchronized void close() - { - tdigestSketch = null; - } - -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java deleted file mode 100644 index 9a0e9a98fb0f..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchAggregatorFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.tdunning.math.stats.MergingDigest; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.AggregatorUtil; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; - -/** - * Factory to create {@link TDigestMergeSketchAggregator} - */ -public class TDigestMergeSketchAggregatorFactory extends TDigestBuildSketchAggregatorFactory -{ - public static final String TYPE_NAME = "mergeTDigestSketch"; - - @JsonCreator - public TDigestMergeSketchAggregatorFactory( - @JsonProperty("name") final String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("compression") final Integer compression - ) - { - super(name, fieldName, compression, AggregatorUtil.TDIGEST_MERGE_SKETCH_CACHE_TYPE_ID); - } - - @Override - public Aggregator factorize(final ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( - getFieldName()); - return new TDigestMergeSketchAggregator(selector, getCompression()); - } - - @Override - public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( - getFieldName() - ); - return new TDigestMergeSketchBufferAggregator(selector, getCompression()); - } -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java deleted file mode 100644 index da4423ef8590..000000000000 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestMergeSketchBufferAggregator.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.tdigestsketch; - -import com.tdunning.math.stats.MergingDigest; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.segment.ColumnValueSelector; - -import java.nio.ByteBuffer; -import java.util.IdentityHashMap; - -/** - * Aggregator that is capable of combining T-Digest sketch serialized as {@link ByteBuffer} - */ -public class TDigestMergeSketchBufferAggregator implements BufferAggregator -{ - private final ColumnValueSelector selector; - private final int compression; - private final IdentityHashMap> sketchCache = new IdentityHashMap(); - - public TDigestMergeSketchBufferAggregator( - ColumnValueSelector selector, - int compression - ) - { - this.selector = selector; - this.compression = compression; - } - - @Override - public void init(ByteBuffer buffer, int position) - { - MergingDigest emptyDigest = new MergingDigest(compression); - addToCache(buffer, position, emptyDigest); - } - - @Override - public void aggregate(ByteBuffer buffer, int position) - { - final MergingDigest sketch = selector.getObject(); - if (sketch == null) { - return; - } - final MergingDigest union = sketchCache.get(buffer).get(position); - union.add(sketch); - } - - @Override - public Object get(ByteBuffer buffer, int position) - { - // sketchCache is an IdentityHashMap where the reference of buffer is used for equality checks. - // So the returned object isn't impacted by the changes in the buffer object made by concurrent threads. - return sketchCache.get(buffer).get(position); - } - - @Override - public float getFloat(final ByteBuffer buffer, final int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong(final ByteBuffer buffer, final int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void close() - { - sketchCache.clear(); - } - - @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - MergingDigest sketch = sketchCache.get(oldBuffer).get(oldPosition); - addToCache(newBuffer, newPosition, sketch); - final Int2ObjectMap map = sketchCache.get(oldBuffer); - map.remove(oldPosition); - if (map.isEmpty()) { - sketchCache.remove(oldBuffer); - } - } - - private void addToCache(final ByteBuffer buffer, final int position, final MergingDigest union) - { - Int2ObjectMap map = sketchCache.computeIfAbsent(buffer, b -> new Int2ObjectOpenHashMap<>()); - map.put(position, union); - } -} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java similarity index 85% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java index 390c0f92b8b7..9197923a800a 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregator.java @@ -30,13 +30,11 @@ /** * Aggregator to build T-Digest sketches on numeric values. * It generally makes sense to use this aggregator during the ingestion time. - * Then during query time one can use {@link TDigestMergeSketchAggregator} to merge - * these pre-aggregated sketches. *

* One can use this aggregator to build these sketches during query time too, just * that it will be slower and more resource intensive. */ -public class TDigestBuildSketchAggregator implements Aggregator +public class TDigestSketchAggregator implements Aggregator { private final ColumnValueSelector selector; @@ -45,13 +43,13 @@ public class TDigestBuildSketchAggregator implements Aggregator private MergingDigest histogram; - public TDigestBuildSketchAggregator(ColumnValueSelector selector, @Nullable Integer compression) + public TDigestSketchAggregator(ColumnValueSelector selector, @Nullable Integer compression) { this.selector = selector; if (compression != null) { this.histogram = new MergingDigest(compression); } else { - this.histogram = new MergingDigest(TDigestBuildSketchAggregatorFactory.DEFAULT_COMPRESSION); + this.histogram = new MergingDigest(TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION); } } @@ -62,6 +60,10 @@ public void aggregate() synchronized (this) { histogram.add(((Number) selector.getObject()).doubleValue()); } + } else if (selector.getObject() instanceof MergingDigest) { + synchronized (this) { + histogram.add((MergingDigest) selector.getObject()); + } } else { TDigestSketchUtils.throwExceptionForWrongType(selector); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java similarity index 79% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java index 08f88b66ae50..b9d34f377d7e 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchAggregatorFactory.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; import org.apache.druid.query.aggregation.Aggregator; @@ -30,9 +31,6 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.column.ValueType; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -55,7 +53,8 @@ * when we have to merge intermediate aggregations which Druid needs to do as * part of query processing. */ -public class TDigestBuildSketchAggregatorFactory extends AggregatorFactory +@JsonTypeName(TDigestSketchAggregatorFactory.TYPE_NAME) +public class TDigestSketchAggregatorFactory extends AggregatorFactory { // Default compression @@ -71,10 +70,10 @@ public class TDigestBuildSketchAggregatorFactory extends AggregatorFactory @Nonnull private final byte cacheTypeId; - public static final String TYPE_NAME = "buildTDigestSketch"; + public static final String TYPE_NAME = "tDigestSketch"; @JsonCreator - public TDigestBuildSketchAggregatorFactory( + public TDigestSketchAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, @Nullable @JsonProperty("compression") final Integer compression @@ -83,7 +82,7 @@ public TDigestBuildSketchAggregatorFactory( this(name, fieldName, compression, AggregatorUtil.TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID); } - TDigestBuildSketchAggregatorFactory( + TDigestSketchAggregatorFactory( final String name, final String fieldName, @Nullable final Integer compression, @@ -109,27 +108,13 @@ public byte[] getCacheKey() @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); - if (cap == null || ValueType.isNumeric(cap.getType())) { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestBuildSketchAggregator(selector, compression); - } else { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestMergeSketchAggregator(selector, compression); - } + return new TDigestSketchAggregator(metricFactory.makeColumnValueSelector(fieldName), compression); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); - if (cap == null || ValueType.isNumeric(cap.getType())) { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestBuildSketchBufferAggregator(selector, compression); - } else { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new TDigestMergeSketchBufferAggregator(selector, compression); - } + return new TDigestSketchBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), compression); } public static final Comparator COMPARATOR = Comparator.nullsFirst( @@ -159,7 +144,7 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregatorFactory getCombiningFactory() { - return new TDigestMergeSketchAggregatorFactory(name, name, compression); + return new TDigestSketchAggregatorFactory(name, name, compression); } @Override @@ -176,7 +161,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre public List getRequiredColumns() { return Collections.singletonList( - new TDigestBuildSketchAggregatorFactory( + new TDigestSketchAggregatorFactory( fieldName, fieldName, compression @@ -243,7 +228,7 @@ public boolean equals(final Object o) if (o == null || !getClass().equals(o.getClass())) { return false; } - final TDigestBuildSketchAggregatorFactory that = (TDigestBuildSketchAggregatorFactory) o; + final TDigestSketchAggregatorFactory that = (TDigestSketchAggregatorFactory) o; return Objects.equals(name, that.name) && Objects.equals(fieldName, that.fieldName) && diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java similarity index 93% rename from extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java rename to extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java index a9f1dd1ea902..3a9eb140a07b 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestBuildSketchBufferAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchBufferAggregator.java @@ -34,7 +34,7 @@ /** * Aggregator that builds T-Digest backed sketch using numeric values read from {@link ByteBuffer} */ -public class TDigestBuildSketchBufferAggregator implements BufferAggregator +public class TDigestSketchBufferAggregator implements BufferAggregator { @Nonnull @@ -42,7 +42,7 @@ public class TDigestBuildSketchBufferAggregator implements BufferAggregator private final int compression; private final IdentityHashMap> sketchCache = new IdentityHashMap(); - public TDigestBuildSketchBufferAggregator( + public TDigestSketchBufferAggregator( final ColumnValueSelector valueSelector, @Nullable final Integer compression ) @@ -52,7 +52,7 @@ public TDigestBuildSketchBufferAggregator( if (compression != null) { this.compression = compression; } else { - this.compression = TDigestBuildSketchAggregatorFactory.DEFAULT_COMPRESSION; + this.compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; } } @@ -70,6 +70,8 @@ public void aggregate(ByteBuffer buffer, int position) Object x = selector.getObject(); if (x instanceof Number) { sketch.add(((Number) x).doubleValue()); + } else if (x instanceof MergingDigest) { + sketch.add((MergingDigest) x); } else { TDigestSketchUtils.throwExceptionForWrongType(selector); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java index 52522b88ee00..d44ce80fc7e3 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchComplexMetricSerde.java @@ -41,7 +41,7 @@ public class TDigestSketchComplexMetricSerde extends ComplexMetricSerde @Override public String getTypeName() { - return TDigestBuildSketchAggregatorFactory.TYPE_NAME; + return TDigestSketchAggregatorFactory.TYPE_NAME; } @Override diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java index a84785a76ba0..1a5150fdf746 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchModule.java @@ -27,7 +27,10 @@ import com.google.inject.Binder; import com.tdunning.math.stats.MergingDigest; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.tdigestsketch.sql.TDigestGenerateSketchSqlAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.sql.TDigestSketchQuantileSqlAggregator; import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.sql.guice.SqlBindings; import java.util.List; @@ -44,16 +47,16 @@ public List getJacksonModules() getClass().getSimpleName() ).registerSubtypes( new NamedType( - TDigestBuildSketchAggregatorFactory.class, - TDigestBuildSketchAggregatorFactory.TYPE_NAME - ), - new NamedType( - TDigestMergeSketchAggregatorFactory.class, - TDigestMergeSketchAggregatorFactory.TYPE_NAME + TDigestSketchAggregatorFactory.class, + TDigestSketchAggregatorFactory.TYPE_NAME ), new NamedType( TDigestSketchToQuantilesPostAggregator.class, TDigestSketchToQuantilesPostAggregator.TYPE_NAME + ), + new NamedType( + TDigestSketchToQuantilePostAggregator.class, + TDigestSketchToQuantilePostAggregator.TYPE_NAME ) ).addSerializer(MergingDigest.class, new TDigestSketchJsonSerializer()) ); @@ -63,11 +66,14 @@ public List getJacksonModules() public void configure(Binder binder) { registerSerde(); + SqlBindings.addAggregator(binder, TDigestSketchQuantileSqlAggregator.class); + SqlBindings.addAggregator(binder, TDigestGenerateSketchSqlAggregator.class); } @VisibleForTesting - static void registerSerde() + public static void registerSerde() { - ComplexMetrics.registerSerde(TDigestBuildSketchAggregatorFactory.TYPE_NAME, new TDigestSketchComplexMetricSerde()); + ComplexMetrics.registerSerde(TDigestSketchAggregatorFactory.TYPE_NAME, new TDigestSketchComplexMetricSerde()); + ComplexMetrics.registerSerde("TDIGEST_GENERATE_SKETCH", new TDigestSketchComplexMetricSerde()); } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java index 765e707ea387..706e3e1ac866 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchObjectStrategy.java @@ -58,6 +58,6 @@ public byte[] toBytes(@Nullable MergingDigest val) @Override public int compare(MergingDigest o1, MergingDigest o2) { - return TDigestBuildSketchAggregatorFactory.COMPARATOR.compare(o1, o2); + return TDigestSketchAggregatorFactory.COMPARATOR.compare(o1, o2); } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java new file mode 100644 index 000000000000..d54b90b4ac89 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilePostAggregator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.tdunning.math.stats.MergingDigest; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Post aggregation operator that can take in aggregated T-Digest sketches and + * generate quantiles from it. + */ +public class TDigestSketchToQuantilePostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + + private final double fraction; + + public static final String TYPE_NAME = "quantileFromTDigestSketch"; + + @JsonCreator + public TDigestSketchToQuantilePostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("fraction") final double fraction + ) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.fraction = fraction; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double getFraction() + { + return fraction; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final MergingDigest sketch = (MergingDigest) field.compute(combinedAggregators); + return sketch.quantile(fraction); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + ", fraction=" + fraction + + "}"; + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + PostAggregatorIds.TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID).appendCacheable(field); + builder.appendDouble(fraction); + return builder.build(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TDigestSketchToQuantilePostAggregator that = (TDigestSketchToQuantilePostAggregator) o; + return Double.compare(that.fraction, fraction) == 0 && + Objects.equals(name, that.name) && + Objects.equals(field, that.field); + } + + @Override + public int hashCode() + { + return Objects.hash(name, field, fraction); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } + +} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java index 485a397dd372..245f648cc3cf 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchToQuantilesPostAggregator.java @@ -25,8 +25,8 @@ import com.tdunning.math.stats.MergingDigest; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Arrays; @@ -57,7 +57,7 @@ public TDigestSketchToQuantilesPostAggregator( this.name = Preconditions.checkNotNull(name, "name is null"); this.field = Preconditions.checkNotNull(field, "field is null"); this.fractions = Preconditions.checkNotNull(fractions, "array of fractions is null"); - Preconditions.checkArgument(this.fractions.length > 1, "Array of fractions cannot be empty"); + Preconditions.checkArgument(this.fractions.length >= 1, "Array of fractions cannot be empty"); } @Override @@ -143,7 +143,7 @@ public int hashCode() public byte[] getCacheKey() { final CacheKeyBuilder builder = new CacheKeyBuilder( - AggregatorUtil.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); + PostAggregatorIds.TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID).appendCacheable(field); for (final double value : fractions) { builder.appendDouble(value); } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java index 0a5d9a18098b..edfd148b4063 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchUtils.java @@ -23,6 +23,10 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.expression.DruidExpression; import java.nio.ByteBuffer; @@ -87,4 +91,33 @@ static void throwExceptionForWrongType(ColumnValueSelector selector) ); throw new IAE(msg); } + + public static boolean matchingAggregatorFactoryExists( + final DruidExpression input, + final Integer compression, + final Aggregation existing, + final TDigestSketchAggregatorFactory factory + ) + { + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = existing.getVirtualColumns() + .stream() + .filter( + virtualColumn -> + virtualColumn.getOutputName() + .equals(factory.getFieldName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + inputMatches = input.isDirectColumnAccess() + && input.getDirectColumn().equals(factory.getFieldName()); + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() + .equals(input.getExpression()); + } + return inputMatches && compression == factory.getCompression(); + } } diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java new file mode 100644 index 000000000000..725f335baee6 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class TDigestGenerateSketchSqlAggregator implements SqlAggregator +{ + private static final SqlAggFunction FUNCTION_INSTANCE = new TDigestGenerateSketchSqlAggregator.TDigestGenerateSketchSqlAggFunction(); + private static final String NAME = "TDIGEST_GENERATE_SKETCH"; + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + final RexNode inputOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ); + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + inputOperand + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String aggName = StringUtils.format("%s:agg", name); + + Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; + if (aggregateCall.getArgList().size() > 1) { + RexNode compressionOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + if (!compressionOperand.isA(SqlKind.LITERAL)) { + // compressionOperand must be a literal in order to plan. + return null; + } + compression = ((Number) RexLiteral.value(compressionOperand)).intValue(); + } + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof TDigestSketchAggregatorFactory) { + final TDigestSketchAggregatorFactory theFactory = (TDigestSketchAggregatorFactory) factory; + final boolean matches = TDigestSketchUtils.matchingAggregatorFactoryExists( + input, + compression, + existing, + (TDigestSketchAggregatorFactory) factory + ); + + if (matches) { + // Found existing one. Use this. + return Aggregation.create( + theFactory + ); + } + } + } + } + + // No existing match found. Create a new one. + final List virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + aggregatorFactory = new TDigestSketchAggregatorFactory( + aggName, + input.getDirectColumn(), + compression + ); + } else { + VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + plannerContext, + input, + SqlTypeName.FLOAT + ); + virtualColumns.add(virtualColumn); + aggregatorFactory = new TDigestSketchAggregatorFactory( + aggName, + virtualColumn.getOutputName(), + compression + ); + } + + return Aggregation.create( + virtualColumns, + aggregatorFactory + ); + } + + private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction + { + private static final String SIGNATURE_WITH_COMPRESSION = "'" + NAME + "(column, compression)'\n"; + + TDigestGenerateSketchSqlAggFunction() + { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.OTHER), + null, + OperandTypes.or( + OperandTypes.ANY, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE_WITH_COMPRESSION, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java new file mode 100644 index 000000000000..022317227028 --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class TDigestSketchQuantileSqlAggregator implements SqlAggregator +{ + private static final SqlAggFunction FUNCTION_INSTANCE = new TDigestSketchQuantileSqlAggFunction(); + private static final String NAME = "TDIGEST_QUANTILE"; + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + // This is expected to be a tdigest sketch + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ) + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String sketchName = StringUtils.format("%s:agg", name); + + // this is expected to be quantile fraction + final RexNode quantileArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + + if (!quantileArg.isA(SqlKind.LITERAL)) { + // Quantile must be a literal in order to plan. + return null; + } + + final double quantile = ((Number) RexLiteral.value(quantileArg)).floatValue(); + Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION; + if (aggregateCall.getArgList().size() > 2) { + final RexNode compressionArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(2) + ); + compression = ((Number) RexLiteral.value(compressionArg)).intValue(); + } + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof TDigestSketchAggregatorFactory) { + final boolean matches = TDigestSketchUtils.matchingAggregatorFactoryExists( + input, + compression, + existing, + (TDigestSketchAggregatorFactory) factory + ); + + if (matches) { + // Found existing one. Use this. + return Aggregation.create( + ImmutableList.of(), + new TDigestSketchToQuantilePostAggregator( + name, + new FieldAccessPostAggregator( + factory.getName(), + factory.getName() + ), + quantile + ) + ); + } + } + } + } + + // No existing match found. Create a new one. + final List virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + aggregatorFactory = new TDigestSketchAggregatorFactory( + sketchName, + input.getDirectColumn(), + compression + ); + } else { + VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + plannerContext, + input, + SqlTypeName.FLOAT + ); + virtualColumns.add(virtualColumn); + aggregatorFactory = new TDigestSketchAggregatorFactory( + sketchName, + virtualColumn.getOutputName(), + compression + ); + } + + return Aggregation.create( + virtualColumns, + ImmutableList.of(aggregatorFactory), + new TDigestSketchToQuantilePostAggregator( + name, + new FieldAccessPostAggregator( + sketchName, + sketchName + ), + quantile + ) + ); + } + + private static class TDigestSketchQuantileSqlAggFunction extends SqlAggFunction + { + private static final String SIGNATURE1 = "'" + NAME + "(column, quantile)'\n"; + private static final String SIGNATURE2 = "'" + NAME + "(column, quantile, compression)'\n"; + + TDigestSketchQuantileSqlAggFunction() + { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.DOUBLE), + null, + OperandTypes.or( + OperandTypes.and( + OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) + ) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java index 67773b165fdb..e531407cdb04 100644 --- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/TDigestSketchAggregatorTest.java @@ -73,23 +73,7 @@ public void serializeDeserializeFactoryWithFieldName() throws Exception { ObjectMapper objectMapper = new DefaultObjectMapper(); new TDigestSketchModule().getJacksonModules().forEach(objectMapper::registerModule); - TDigestBuildSketchAggregatorFactory factory = new TDigestBuildSketchAggregatorFactory("name", "filedName", 128); - - AggregatorFactory other = objectMapper.readValue( - objectMapper.writeValueAsString(factory), - AggregatorFactory.class - ); - - Assert.assertEquals(factory, other); - } - - // this is to test Json properties and equals for the combining factory - @Test - public void serializeDeserializeCombiningFactoryWithFieldName() throws Exception - { - ObjectMapper objectMapper = new DefaultObjectMapper(); - new TDigestSketchModule().getJacksonModules().forEach(objectMapper::registerModule); - TDigestMergeSketchAggregatorFactory factory = new TDigestMergeSketchAggregatorFactory("name", "fieldName", 128); + TDigestSketchAggregatorFactory factory = new TDigestSketchAggregatorFactory("name", "filedName", 128); AggregatorFactory other = objectMapper.readValue( objectMapper.writeValueAsString(factory), @@ -120,7 +104,7 @@ public void buildingSketchesAtIngestionTime() throws Exception " }", "}" ), - "[{\"type\": \"buildTDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}]", + "[{\"type\": \"tDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}]", 0, // minTimestamp Granularities.NONE, 10, // maxRowCount @@ -132,7 +116,7 @@ public void buildingSketchesAtIngestionTime() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"tDigestSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", " ],", @@ -190,7 +174,7 @@ public void buildingSketchesAtQueryTime() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"buildTDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}", + " {\"type\": \"tDigestSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"compression\": 200}", " ],", " \"postAggregations\": [", " {\"type\": \"quantilesFromTDigestSketch\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", @@ -237,7 +221,7 @@ public void testIngestingSketches() throws Exception String.join( "\n", "[", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + " {\"type\": \"tDigestSketch\", \"name\": \"first_level_merge_sketch\", \"fieldName\": \"sketch\", " + "\"compression\": " + "200}", "]" @@ -253,7 +237,7 @@ public void testIngestingSketches() throws Exception " \"granularity\": \"ALL\",", " \"dimensions\": [],", " \"aggregations\": [", - " {\"type\": \"mergeTDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + " {\"type\": \"tDigestSketch\", \"name\": \"second_level_merge_sketch\", \"fieldName\": " + "\"first_level_merge_sketch\", \"compression\": " + "200}", " ],", diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java new file mode 100644 index 000000000000..ab575c3b183e --- /dev/null +++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.tdigestsketch.sql; + +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchModule; +import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.DruidOperatorTable; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.PlannerFactory; +import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.SystemSchema; +import org.apache.druid.sql.calcite.util.CalciteTestBase; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.QueryLogHook; +import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class TDigestSketchSqlAggregatorTest extends CalciteTestBase +{ + private static final String DATA_SOURCE = "foo"; + + private static QueryRunnerFactoryConglomerate conglomerate; + private static Closer resourceCloser; + private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy" + ); + + @BeforeClass + public static void setUpClass() + { + final Pair conglomerateCloserPair = CalciteTests + .createQueryRunnerFactoryConglomerate(); + conglomerate = conglomerateCloserPair.lhs; + resourceCloser = conglomerateCloserPair.rhs; + } + + @AfterClass + public static void tearDownClass() throws IOException + { + resourceCloser.close(); + } + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public QueryLogHook queryLogHook = QueryLogHook.create(); + + private SpecificSegmentsQuerySegmentWalker walker; + private SqlLifecycleFactory sqlLifecycleFactory; + + @Before + public void setUp() throws Exception + { + TDigestSketchModule.registerSerde(); + for (Module mod : new TDigestSketchModule().getJacksonModules()) { + CalciteTests.getJsonMapper().registerModule(mod); + } + + final QueryableIndex index = + IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new TDigestSketchAggregatorFactory( + "qsketch_m1", + "m1", + 128 + ) + ) + .withRollup(false) + .build() + ) + .rows(CalciteTests.ROWS1) + .buildMMappedIndex(); + + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(index.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index + ); + + final PlannerConfig plannerConfig = new PlannerConfig(); + final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); + final DruidOperatorTable operatorTable = new DruidOperatorTable( + ImmutableSet.of(new TDigestSketchQuantileSqlAggregator(), new TDigestGenerateSketchSqlAggregator()), + ImmutableSet.of() + ); + sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory( + new PlannerFactory( + druidSchema, + systemSchema, + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + operatorTable, + CalciteTests.createExprMacroTable(), + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.getJsonMapper() + ) + ); + } + + @After + public void tearDown() throws Exception + { + walker.close(); + walker = null; + } + + @Test + public void testComputingSketchOnNumericValues() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_GENERATE_SKETCH(m1, 200)" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new String[] { + "\"AAAAAT/wAAAAAAAAQBgAAAAAAABAaQAAAAAAAAAAAAY/8AAAAAAAAD/wAAAAAAAAP/AAAAAAAABAAAAAAAAAAD/wAAAAAAAAQAgAAAAAAAA/8AAAAAAAAEAQAAAAAAAAP/AAAAAAAABAFAAAAAAAAD/wAAAAAAAAQBgAAAAAAAA=\"" + } + ); + + Assert.assertEquals(expectedResults.size(), results.size()); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", 200) + )) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testDefaultCompressionForTDigestGenerateSketchAgg() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_GENERATE_SKETCH(m1)" + + "FROM foo"; + + // Log query + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION) + )) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testComputingQuantileOnPreAggregatedSketch() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.1),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.4),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 0.8),\n" + + "TDIGEST_QUANTILE(qsketch_m1, 1.0)\n" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.1, + 2.9, + 5.3, + 6.0 + } + ); + + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "qsketch_m1", 100) + )) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.1f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a0:agg"), 0.4f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a0:agg"), 0.8f), + new TDigestSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testGeneratingSketchAndComputingQuantileOnFly() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT TDIGEST_QUANTILE(x, 0.0), TDIGEST_QUANTILE(x, 0.5), TDIGEST_QUANTILE(x, 1.0)\n" + + "FROM (SELECT dim1, TDIGEST_GENERATE_SKETCH(m1, 200) AS x FROM foo group by dim1)"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.0, + 3.5, + 6.0 + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("dim1", "d0")) + .setAggregatorSpecs( + ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", 200) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build() + ) + ) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs( + ImmutableList.of( + new TDigestSketchAggregatorFactory("_a0:agg", "a0:agg", 100) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new TDigestSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("_a1", makeFieldAccessPostAgg("_a0:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("_a2", makeFieldAccessPostAgg("_a0:agg"), 1.0f) + ) + ) + .setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testQuantileOnNumericValues() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(m1, 0.0), TDIGEST_QUANTILE(m1, 0.5), TDIGEST_QUANTILE(m1, 1.0)\n" + + "FROM foo"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final List expectedResults = ImmutableList.of( + new double[] { + 1.0, + 3.5, + 6.0 + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Object[] objects = results.get(i); + Assert.assertArrayEquals( + expectedResults.get(i), + Stream.of(objects).mapToDouble(value -> ((Double) value).doubleValue()).toArray(), + 0.000001 + ); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", null) + )) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a0:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a0:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testCompressionParamForTDigestQuantileAgg() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "TDIGEST_QUANTILE(m1, 0.0), TDIGEST_QUANTILE(m1, 0.5, 200), TDIGEST_QUANTILE(m1, 1.0, 300)\n" + + "FROM foo"; + + // Log query + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new TDigestSketchAggregatorFactory("a0:agg", "m1", + TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION + ), + new TDigestSketchAggregatorFactory("a1:agg", "m1", + 200 + ), + new TDigestSketchAggregatorFactory("a2:agg", "m1", + 300 + ))) + .postAggregators( + new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f), + new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.5f), + new TDigestSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 1.0f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + + private static PostAggregator makeFieldAccessPostAgg(String name) + { + return new FieldAccessPostAggregator(name, name); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index e5e5f51d3a8b..ca0eb3ae22b1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -121,8 +121,6 @@ public class AggregatorUtil // TDigest sketch aggregators public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38; - public static final byte TDIGEST_MERGE_SKETCH_CACHE_TYPE_ID = 0x39; - public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 0x40; /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java index 52d693d6cc1b..4d9f9f5059c3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java @@ -51,4 +51,6 @@ public class PostAggregatorIds public static final byte QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID = 27; public static final byte QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID = 28; public static final byte THETA_SKETCH_TO_STRING = 29; + public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 30; + public static final byte TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 31; }