From 3d7c8bf8d1ab6425716b82ef5e7c80d8f17237f9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 29 Apr 2021 13:36:37 -0700 Subject: [PATCH 1/2] Vectorize the DataSketches quantiles aggregator. Also removes synchronization for the BufferAggregator and VectorAggregator implementations, since it is not necessary (similar to #11115). Extends DoublesSketchAggregatorTest and DoublesSketchSqlAggregatorTest to run all test cases in vectorized mode. --- .../DoublesSketchAggregatorFactory.java | 88 +- .../DoublesSketchBuildBufferAggregator.java | 78 +- ...blesSketchBuildBufferAggregatorHelper.java | 103 ++ .../DoublesSketchBuildVectorAggregator.java | 104 ++ .../DoublesSketchMergeBufferAggregator.java | 69 +- ...blesSketchMergeBufferAggregatorHelper.java | 104 ++ .../DoublesSketchMergeVectorAggregator.java | 106 ++ .../quantiles/DoublesSketchOperations.java | 5 +- .../NoopDoublesSketchBufferAggregator.java | 32 +- .../DoublesSketchAggregatorTest.java | 23 +- .../sql/DoublesSketchSqlAggregatorTest.java | 914 ++++++++---------- .../aggregation/AggregationTestHelper.java | 39 +- 12 files changed, 1045 insertions(+), 620 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 2ea6a2a45fb9..27da4a21ab3d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -32,11 +32,22 @@ import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.util.Collections; @@ -63,7 +74,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory public DoublesSketchAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, - @JsonProperty("k") final Integer k) + @JsonProperty("k") final Integer k + ) { this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID); } @@ -106,7 +118,7 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFact { if (metricFactory.getColumnCapabilities(fieldName) != null && ValueType.isNumeric(metricFactory.getColumnCapabilities(fieldName).getType())) { - final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + final BaseDoubleColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); if (selector instanceof NilColumnValueSelector) { return new NoopDoublesSketchBufferAggregator(); } @@ -119,6 +131,65 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFact return new DoublesSketchMergeBufferAggregator(selector, k, getMaxIntermediateSizeWithNulls()); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return ColumnProcessors.makeVectorProcessor( + fieldName, + new VectorColumnProcessorFactory() + { + @Override + public VectorAggregator makeSingleValueDimensionProcessor( + ColumnCapabilities capabilities, + SingleValueDimensionVectorSelector selector + ) + { + return new NoopDoublesSketchBufferAggregator(); + } + + @Override + public VectorAggregator makeMultiValueDimensionProcessor( + ColumnCapabilities capabilities, + MultiValueDimensionVectorSelector selector + ) + { + return new NoopDoublesSketchBufferAggregator(); + } + + @Override + public VectorAggregator makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls()); + } + + @Override + public VectorAggregator makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls()); + } + + @Override + public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls()); + } + + @Override + public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) + { + return new DoublesSketchMergeVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls()); + } + }, + selectorFactory + ); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Object deserialize(final Object object) { @@ -217,8 +288,9 @@ public List getRequiredColumns() new DoublesSketchAggregatorFactory( fieldName, fieldName, - k) - ); + k + ) + ); } @Override @@ -306,10 +378,10 @@ public int hashCode() public String toString() { return getClass().getSimpleName() + "{" - + "name=" + name - + ", fieldName=" + fieldName - + ", k=" + k - + "}"; + + "name=" + name + + ", fieldName=" + fieldName + + ", k=" + k + + "}"; } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index e3303105afc5..74be19bfd27c 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -19,60 +19,52 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.memory.WritableMemory; -import org.apache.datasketches.quantiles.DoublesSketch; import org.apache.datasketches.quantiles.UpdateDoublesSketch; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; public class DoublesSketchBuildBufferAggregator implements BufferAggregator { - private final ColumnValueSelector selector; - private final int size; - private final int maxIntermediateSize; + private final BaseDoubleColumnValueSelector selector; + private final DoublesSketchBuildBufferAggregatorHelper helper; - private final IdentityHashMap memCache = new IdentityHashMap<>(); - private final IdentityHashMap> sketches = new IdentityHashMap<>(); - - public DoublesSketchBuildBufferAggregator(final ColumnValueSelector valueSelector, final int size, - final int maxIntermediateSize) + public DoublesSketchBuildBufferAggregator( + final BaseDoubleColumnValueSelector valueSelector, + final int size, + final int maxIntermediateSize + ) { this.selector = valueSelector; - this.size = size; - this.maxIntermediateSize = maxIntermediateSize; + this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize); } @Override - public synchronized void init(final ByteBuffer buffer, final int position) + public void init(ByteBuffer buf, int position) { - final WritableMemory mem = getMemory(buffer); - final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region); - putSketch(buffer, position, sketch); + helper.init(buf, position); } @Override - public synchronized void aggregate(final ByteBuffer buffer, final int position) + public void aggregate(final ByteBuffer buffer, final int position) { if (selector.isNull()) { return; } - final UpdateDoublesSketch sketch = sketches.get(buffer).get(position); + + final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position); sketch.update(selector.getDouble()); } + @Nullable @Override - public synchronized Object get(final ByteBuffer buffer, final int position) + public Object get(ByteBuffer buf, int position) { - return sketches.get(buffer).get(position).compact(); + return helper.get(buf, position); } @Override @@ -88,42 +80,17 @@ public long getLong(final ByteBuffer buffer, final int position) } @Override - public synchronized void close() + public void close() { - sketches.clear(); - memCache.clear(); + helper.clear(); } // A small number of sketches may run out of the given memory, request more memory on heap and move there. // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer. @Override - public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition); - final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize); - if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap - final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize); - sketch = UpdateDoublesSketch.wrap(newRegion); - } - putSketch(newBuffer, newPosition, sketch); - - final Int2ObjectMap map = sketches.get(oldBuffer); - map.remove(oldPosition); - if (map.isEmpty()) { - sketches.remove(oldBuffer); - memCache.remove(oldBuffer); - } - } - - private WritableMemory getMemory(final ByteBuffer buffer) - { - return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)); - } - - private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch) + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { - Int2ObjectMap map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); - map.put(position, sketch); + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); } @Override @@ -131,5 +98,4 @@ public void inspectRuntimeShape(final RuntimeShapeInspector inspector) { inspector.visit("selector", selector); } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java new file mode 100644 index 000000000000..f5286bddad07 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.quantiles.CompactDoublesSketch; +import org.apache.datasketches.quantiles.DoublesSketch; +import org.apache.datasketches.quantiles.UpdateDoublesSketch; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; + +public class DoublesSketchBuildBufferAggregatorHelper +{ + private final int size; + private final int maxIntermediateSize; + private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final IdentityHashMap> sketches = new IdentityHashMap<>(); + + public DoublesSketchBuildBufferAggregatorHelper(final int size, final int maxIntermediateSize) + { + this.size = size; + this.maxIntermediateSize = maxIntermediateSize; + } + + public void init(final ByteBuffer buffer, final int position) + { + final WritableMemory mem = getMemory(buffer); + final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); + final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region); + putSketch(buffer, position, sketch); + } + + public CompactDoublesSketch get(final ByteBuffer buffer, final int position) + { + return sketches.get(buffer).get(position).compact(); + } + + // A small number of sketches may run out of the given memory, request more memory on heap and move there. + // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer. + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition); + final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize); + if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap + final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize); + sketch = UpdateDoublesSketch.wrap(newRegion); + } + putSketch(newBuffer, newPosition, sketch); + + final Int2ObjectMap map = sketches.get(oldBuffer); + map.remove(oldPosition); + if (map.isEmpty()) { + sketches.remove(oldBuffer); + memCache.remove(oldBuffer); + } + } + + public void clear() + { + sketches.clear(); + memCache.clear(); + } + + /** + * Retrieves the sketch at a particular position. + */ + public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int position) + { + return sketches.get(buf).get(position); + } + + private WritableMemory getMemory(final ByteBuffer buffer) + { + return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)); + } + + private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch) + { + Int2ObjectMap map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, sketch); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java new file mode 100644 index 000000000000..af29c5b9cdee --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import org.apache.datasketches.quantiles.UpdateDoublesSketch; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoublesSketchBuildVectorAggregator implements VectorAggregator +{ + private final VectorValueSelector selector; + private final DoublesSketchBuildBufferAggregatorHelper helper; + + DoublesSketchBuildVectorAggregator( + final VectorValueSelector selector, + final int size, + final int maxIntermediateSize + ) + { + this.selector = selector; + this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final double[] doubles = selector.getDoubleVector(); + final boolean[] nulls = selector.getNullVector(); + + final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (nulls == null || !nulls[i]) { + sketch.update(doubles[i]); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final double[] doubles = selector.getDoubleVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + + if (nulls == null || !nulls[idx]) { + final int position = positions[i] + positionOffset; + helper.getSketchAtPosition(buf, position).update(doubles[idx]); + } + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return helper.get(buf, position); + } + + @Override + public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) + { + helper.relocate(oldPosition, newPosition, oldBuf, newBuf); + } + + @Override + public void close() + { + helper.clear(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java index 41ae4532361d..c788adc24352 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java @@ -19,57 +19,44 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.memory.WritableMemory; -import org.apache.datasketches.quantiles.DoublesUnion; +import org.apache.datasketches.quantiles.DoublesSketch; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; public class DoublesSketchMergeBufferAggregator implements BufferAggregator { - - private final ColumnValueSelector selector; - private final int k; - private final int maxIntermediateSize; - private final IdentityHashMap memCache = new IdentityHashMap<>(); - private final IdentityHashMap> unions = new IdentityHashMap<>(); + private final ColumnValueSelector selector; + private final DoublesSketchMergeBufferAggregatorHelper helper; public DoublesSketchMergeBufferAggregator( - final ColumnValueSelector selector, + final ColumnValueSelector selector, final int k, - final int maxIntermediateSize) + final int maxIntermediateSize + ) { this.selector = selector; - this.k = k; - this.maxIntermediateSize = maxIntermediateSize; + this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize); } @Override - public synchronized void init(final ByteBuffer buffer, final int position) + public void init(final ByteBuffer buffer, final int position) { - final WritableMemory mem = getMemory(buffer); - final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region); - putUnion(buffer, position, union); + helper.init(buffer, position); } @Override - public synchronized void aggregate(final ByteBuffer buffer, final int position) + public void aggregate(final ByteBuffer buffer, final int position) { - final DoublesUnion union = unions.get(buffer).get(position); - DoublesSketchMergeAggregator.updateUnion(selector, union); + DoublesSketchMergeAggregator.updateUnion(selector, helper.getSketchAtPosition(buffer, position)); } @Override - public synchronized Object get(final ByteBuffer buffer, final int position) + public Object get(final ByteBuffer buffer, final int position) { - return unions.get(buffer).get(position).getResult(); + return helper.getSketchAtPosition(buffer, position).getResult(); } @Override @@ -87,8 +74,7 @@ public long getLong(final ByteBuffer buffer, final int position) @Override public synchronized void close() { - unions.clear(); - memCache.clear(); + helper.clear(); } // A small number of sketches may run out of the given memory, request more memory on heap and move there. @@ -96,31 +82,7 @@ public synchronized void close() @Override public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { - DoublesUnion union = unions.get(oldBuffer).get(oldPosition); - final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize); - if (union.isSameResource(oldMem)) { // union was not relocated on heap - final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize); - union = DoublesUnion.wrap(newMem); - } - putUnion(newBuffer, newPosition, union); - - Int2ObjectMap map = unions.get(oldBuffer); - map.remove(oldPosition); - if (map.isEmpty()) { - unions.remove(oldBuffer); - memCache.remove(oldBuffer); - } - } - - private WritableMemory getMemory(final ByteBuffer buffer) - { - return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)); - } - - private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union) - { - Int2ObjectMap map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); - map.put(position, union); + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); } @Override @@ -128,5 +90,4 @@ public void inspectRuntimeShape(final RuntimeShapeInspector inspector) { inspector.visit("selector", selector); } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java new file mode 100644 index 000000000000..378e41d8200e --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.quantiles.DoublesUnion; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; + +public class DoublesSketchMergeBufferAggregatorHelper +{ + private final int k; + private final int maxIntermediateSize; + private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final IdentityHashMap> unions = new IdentityHashMap<>(); + + public DoublesSketchMergeBufferAggregatorHelper( + final int k, + final int maxIntermediateSize + ) + { + this.k = k; + this.maxIntermediateSize = maxIntermediateSize; + } + + public void init(final ByteBuffer buffer, final int position) + { + final WritableMemory mem = getMemory(buffer); + final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); + final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region); + putUnion(buffer, position, union); + } + + public Object get(final ByteBuffer buffer, final int position) + { + return unions.get(buffer).get(position).getResult(); + } + + public void clear() + { + unions.clear(); + memCache.clear(); + } + + // A small number of sketches may run out of the given memory, request more memory on heap and move there. + // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer. + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + DoublesUnion union = unions.get(oldBuffer).get(oldPosition); + final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize); + if (union.isSameResource(oldMem)) { // union was not relocated on heap + final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize); + union = DoublesUnion.wrap(newMem); + } + putUnion(newBuffer, newPosition, union); + + Int2ObjectMap map = unions.get(oldBuffer); + map.remove(oldPosition); + if (map.isEmpty()) { + unions.remove(oldBuffer); + memCache.remove(oldBuffer); + } + } + + /** + * Retrieves the sketch at a particular position. + */ + public DoublesUnion getSketchAtPosition(final ByteBuffer buf, final int position) + { + return unions.get(buf).get(position); + } + + private WritableMemory getMemory(final ByteBuffer buffer) + { + return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)); + } + + private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union) + { + Int2ObjectMap map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, union); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java new file mode 100644 index 000000000000..8a8e10b48a0d --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import org.apache.datasketches.quantiles.DoublesSketch; +import org.apache.datasketches.quantiles.DoublesUnion; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoublesSketchMergeVectorAggregator implements VectorAggregator +{ + private final VectorObjectSelector selector; + private final DoublesSketchMergeBufferAggregatorHelper helper; + + public DoublesSketchMergeVectorAggregator( + final VectorObjectSelector selector, + final int k, + final int maxIntermediateSize + ) + { + this.selector = selector; + this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize); + } + + @Override + public void init(ByteBuffer buf, int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Object[] vector = selector.getObjectVector(); + + final DoublesUnion union = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[i]; + if (sketch != null) { + union.update(sketch); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i]; + + if (sketch != null) { + final int position = positions[i] + positionOffset; + final DoublesUnion union = helper.getSketchAtPosition(buf, position); + union.update(sketch); + } + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return helper.get(buf, position); + } + + @Override + public void close() + { + helper.clear(); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java index cbb40c45a689..e30fb9bdae83 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java @@ -41,8 +41,9 @@ public static DoublesSketch deserialize(final Object serializedSketch) return (DoublesSketch) serializedSketch; } throw new ISE( - "Object is not of a type that can be deserialized to a quantiles DoublsSketch: " - + serializedSketch.getClass()); + "Object is not of a type that can be deserialized to a quantiles DoublesSketch: %s", + serializedSketch == null ? "null" : serializedSketch.getClass() + ); } public static DoublesSketch deserializeFromBase64EncodedString(final String str) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java index 5f7808c65ec0..2a0a40b27925 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java @@ -20,20 +20,42 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; -public class NoopDoublesSketchBufferAggregator implements BufferAggregator +public class NoopDoublesSketchBufferAggregator implements BufferAggregator, VectorAggregator { @Override public void init(final ByteBuffer buf, final int position) { + // Nothing to do. } @Override public void aggregate(final ByteBuffer buf, final int position) { + // Nothing to do. + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Nothing to do. + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + // Nothing to do. } @Override @@ -54,13 +76,21 @@ public long getLong(final ByteBuffer buf, final int position) throw new UnsupportedOperationException("Not implemented"); } + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + // Nothing to do. + } + @Override public void close() { + // Nothing to do. } @Override public void inspectRuntimeShape(final RuntimeShapeInspector inspector) { + // Nothing to do. } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java index c0a28d3a0f63..815236922692 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java @@ -20,10 +20,12 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; @@ -54,24 +56,29 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - public DoublesSketchAggregatorTest(final GroupByQueryConfig config) + public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize) { DoublesSketchModule.registerSerde(); DoublesSketchModule module = new DoublesSketchModule(); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( - module.getJacksonModules(), config, tempFolder); + module.getJacksonModules(), + config, + tempFolder + ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize)); timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper( module.getJacksonModules(), tempFolder - ); + ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize)); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -381,7 +388,11 @@ public void buildingSketchesAtQueryTime() throws Exception // post agg with nulls Object quantileObjectWithNulls = row.get(5); Assert.assertTrue(quantileObjectWithNulls instanceof Double); - Assert.assertEquals(NullHandling.replaceWithDefault() ? 7.4 : 7.5, (double) quantileObjectWithNulls, 0.1); // median value + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 7.4 : 7.5, + (double) quantileObjectWithNulls, + 0.1 + ); // median value // post agg with nulls Object quantilesObjectWithNulls = row.get(6); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index c6c8e1386380..27dc3880370a 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -23,12 +23,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Druids; -import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -70,10 +68,10 @@ import org.apache.druid.sql.http.SqlParameter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; -import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -172,166 +170,136 @@ private SqlLifecycle getSqlLifecycle() @Test public void testQuantileOnFloatAndLongs() throws Exception { - SqlLifecycle sqlLifecycle = getSqlLifecycle(); - final String sql = "SELECT\n" - + "APPROX_QUANTILE_DS(m1, 0.01),\n" - + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n" - + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n" - + "APPROX_QUANTILE_DS(m1, 0.99),\n" - + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n" - + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" - + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" - + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n" - + "APPROX_QUANTILE_DS(cnt, 0.5)\n" - + "FROM foo"; - - // Verify results - final List results = sqlLifecycle.runSimple( - sql, - TIMESERIES_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); - final List expectedResults = ImmutableList.of( - new Object[]{ - 1.0, - 4.0, - 6.0, - 6.0, - 12.0, - 6.0, - 5.0, - 6.0, - 1.0 - } - ); - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - - // Verify query - Assert.assertEquals( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .granularity(Granularities.ALL) - .virtualColumns( - new ExpressionVirtualColumn( - "v0", - "(\"m1\" * 2)", - ValueType.FLOAT, - TestExprMacroTable.INSTANCE + testQuery( + "SELECT\n" + + "APPROX_QUANTILE_DS(m1, 0.01),\n" + + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n" + + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n" + + "APPROX_QUANTILE_DS(m1, 0.99),\n" + + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n" + + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_DS(cnt, 0.5)\n" + + "FROM foo", + Collections.singletonList( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "(\"m1\" * 2)", + ValueType.FLOAT, + TestExprMacroTable.INSTANCE + ) ) - ) - .aggregators(ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "m1", null), - new DoublesSketchAggregatorFactory("a1:agg", "m1", 64), - new DoublesSketchAggregatorFactory("a2:agg", "m1", 256), - new DoublesSketchAggregatorFactory("a4:agg", "v0", null), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a5:agg", "m1", null), - new SelectorDimFilter("dim1", "abc", null) - ), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a6:agg", "m1", null), - new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) - ), - new DoublesSketchAggregatorFactory("a8:agg", "cnt", null) - )) - .postAggregators( - new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), - new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), - new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), - new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), - new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), - new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f), - new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f) - ) - .context(TIMESERIES_CONTEXT_DEFAULT) - .build(), - Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + .aggregators(ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", null), + new DoublesSketchAggregatorFactory("a1:agg", "m1", 64), + new DoublesSketchAggregatorFactory("a2:agg", "m1", 256), + new DoublesSketchAggregatorFactory("a4:agg", "v0", null), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a5:agg", "m1", null), + new SelectorDimFilter("dim1", "abc", null) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a6:agg", "m1", null), + new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) + ), + new DoublesSketchAggregatorFactory("a8:agg", "cnt", null) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), + new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), + new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), + new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), + new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f), + new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + 1.0, + 4.0, + 6.0, + 6.0, + 12.0, + 6.0, + 5.0, + 6.0, + 1.0 + } + ) ); } @Test public void testQuantileOnComplexColumn() throws Exception { - SqlLifecycle lifecycle = getSqlLifecycle(); - final String sql = "SELECT\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" - + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n" - + "FROM foo"; - - // Verify results - final List results = lifecycle.runSimple( - sql, - TIMESERIES_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); - final List expectedResults = ImmutableList.of( - new Object[]{ - 1.0, - 4.0, - 6.0, - 6.0, - 6.0, - 5.0, - 6.0 - } - ); - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - - // Verify query - Assert.assertEquals( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .granularity(Granularities.ALL) - .aggregators(ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null), - new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64), - new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null), - new SelectorDimFilter("dim1", "abc", null) - ), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null), - new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) + testQuery( + "SELECT\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n" + + "FROM foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null), + new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64), + new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null), + new SelectorDimFilter("dim1", "abc", null) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null), + new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) + ) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), + new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), + new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f), + new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f) ) - )) - .postAggregators( - new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), - new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), - new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), - new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f), - new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f) - ) - .context(TIMESERIES_CONTEXT_DEFAULT) - .build(), - Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + 1.0, + 4.0, + 6.0, + 6.0, + 6.0, + 5.0, + 6.0 + } + ) ); } @Test public void testQuantileOnCastedString() throws Exception { - cannotVectorize(); - final List expectedResults; if (NullHandling.replaceWithDefault()) { expectedResults = ImmutableList.of( @@ -425,92 +393,72 @@ public void testQuantileOnCastedString() throws Exception @Test public void testQuantileOnInnerQuery() throws Exception { - SqlLifecycle sqlLifecycle = getSqlLifecycle(); - final String sql = "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n" - + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)"; - - // Verify results - final List results = sqlLifecycle.runSimple( - sql, - QUERY_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); final List expectedResults; if (NullHandling.replaceWithDefault()) { expectedResults = ImmutableList.of(new Object[]{7.0, 11.0}); } else { expectedResults = ImmutableList.of(new Object[]{5.25, 8.0}); } - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - // Verify query - Assert.assertEquals( - GroupByQuery.builder() - .setDataSource( - new QueryDataSource( - GroupByQuery.builder() - .setDataSource(CalciteTests.DATASOURCE1) - .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .setGranularity(Granularities.ALL) - .setDimensions(new DefaultDimensionSpec("dim2", "d0")) - .setAggregatorSpecs( - ImmutableList.of( - new DoubleSumAggregatorFactory("a0", "m1") + testQuery( + "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n" + + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)", + Collections.singletonList( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("dim2", "d0")) + .setAggregatorSpecs( + ImmutableList.of( + new DoubleSumAggregatorFactory("a0", "m1") + ) ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) ) - ) - .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .setGranularity(Granularities.ALL) - .setAggregatorSpecs( - new DoubleSumAggregatorFactory("_a0:sum", "a0"), - new CountAggregatorFactory("_a0:count"), - new DoublesSketchAggregatorFactory( - "_a1:agg", - "a0", - null + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs( + new DoubleSumAggregatorFactory("_a0:sum", "a0"), + new CountAggregatorFactory("_a0:count"), + new DoublesSketchAggregatorFactory( + "_a1:agg", + "a0", + null + ) ) - ) - .setPostAggregatorSpecs( - ImmutableList.of( - new ArithmeticPostAggregator( - "_a0", - "quotient", - ImmutableList.of( - new FieldAccessPostAggregator(null, "_a0:sum"), - new FieldAccessPostAggregator(null, "_a0:count") + .setPostAggregatorSpecs( + ImmutableList.of( + new ArithmeticPostAggregator( + "_a0", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "_a0:sum"), + new FieldAccessPostAggregator(null, "_a0:count") + ) + ), + new DoublesSketchToQuantilePostAggregator( + "_a1", + makeFieldAccessPostAgg("_a1:agg"), + 0.98f ) - ), - new DoublesSketchToQuantilePostAggregator("_a1", makeFieldAccessPostAgg("_a1:agg"), 0.98f) + ) ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build(), - Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults ); } @Test public void testQuantileOnInnerQuantileQuery() throws Exception { - SqlLifecycle sqlLifecycle = getSqlLifecycle(); - final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n" - + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1"; - - - final List results = sqlLifecycle.runSimple( - sql, - QUERY_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); - ImmutableList.Builder builder = ImmutableList.builder(); builder.add(new Object[]{"", 1.0}); builder.add(new Object[]{"1", 4.0}); @@ -519,296 +467,288 @@ public void testQuantileOnInnerQuantileQuery() throws Exception builder.add(new Object[]{"abc", 6.0}); builder.add(new Object[]{"def", 5.0}); final List expectedResults = builder.build(); - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - // Verify query - Assert.assertEquals( - GroupByQuery.builder() - .setDataSource( - new QueryDataSource( - GroupByQuery.builder() - .setDataSource(CalciteTests.DATASOURCE1) - .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .setGranularity(Granularities.ALL) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) + testQuery( + "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n" + + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1", + Collections.singletonList( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") ) - ) - .setPostAggregatorSpecs( - ImmutableList.of( - new DoublesSketchToQuantilePostAggregator( - "a0", - makeFieldAccessPostAgg("a0:agg"), - 0.5f + .setAggregatorSpecs( + ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) ) ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() + .setPostAggregatorSpecs( + ImmutableList.of( + new DoublesSketchToQuantilePostAggregator( + "a0", + makeFieldAccessPostAgg("a0:agg"), + 0.5f + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) ) - ) - .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .setGranularity(Granularities.ALL) - .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING)) - .setAggregatorSpecs( - new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128) - ) - .setPostAggregatorSpecs( - ImmutableList.of( - new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f) + .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING)) + .setAggregatorSpecs( + new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128) ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build(), - Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + .setPostAggregatorSpecs( + ImmutableList.of( + new DoublesSketchToQuantilePostAggregator( + "_a0", + makeFieldAccessPostAgg("_a0:agg"), + 0.5f + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults ); } @Test public void testDoublesSketchPostAggs() throws Exception { - SqlLifecycle sqlLifecycle = getSqlLifecycle(); - final String sql = "SELECT\n" - + " SUM(cnt),\n" - + " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n" - + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n" - + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n" - + " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n" - + " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n" - + " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" - + " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n" - + " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" - + " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt))\n" - + "FROM foo"; - - // Verify results - final List results = sqlLifecycle.runSimple( - sql, - TIMESERIES_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); - final List expectedResults = ImmutableList.of( - new Object[]{ - 6L, - 2.0d, - 1001.0d, - 1124.0d, - 1.0d, - "[1.0,1.0]", - "[0.0,0.0,6.0]", - 1.0d, - "[0.0,0.0,1.0]", - "\n" - + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n" - + " Empty : false\n" - + " Direct, Capacity bytes : false, \n" - + " Estimation Mode : false\n" - + " K : 128\n" - + " N : 6\n" - + " Levels (Needed, Total, Valid): 0, 0, 0\n" - + " Level Bit Pattern : 0\n" - + " BaseBufferCount : 6\n" - + " Combined Buffer Capacity : 8\n" - + " Retained Items : 6\n" - + " Compact Storage Bytes : 80\n" - + " Updatable Storage Bytes : 96\n" - + " Normalized Rank Error : 1.406%\n" - + " Normalized Rank Error (PMF) : 1.711%\n" - + " Min Value : 1.000000e+00\n" - + " Max Value : 1.000000e+00\n" - + "### END SKETCH SUMMARY\n" - } + testQuery( + "SELECT\n" + + " SUM(cnt),\n" + + " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n" + + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n" + + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n" + + " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n" + + " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n" + + " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" + + " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n" + + " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" + + " -- The nonvectorized query uses a regular Aggregator, and the vectorized query uses a buffer-based\n" + + " -- VectorAggregator. The buffer-based aggregators return HeapCompactDoublesSketch instead of\n" + + " -- HeapUpdateDoublesSketch since they must make a copy out of the buffer before returning something.\n" + + " -- Use REPLACE to normalize summaries.\n" + + " REPLACE(" + + " REPLACE(" + + " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt))," + + " 'HeapCompactDoublesSketch'," + + " 'HeapUpdateDoublesSketch'" + + " )," + + " 'Combined Buffer Capacity : 6'," + + " 'Combined Buffer Capacity : 8'" + + " )\n" + + "FROM foo", + Collections.singletonList( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "(\"cnt\" + 123)", + ValueType.FLOAT, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators(ImmutableList.of( + new LongSumAggregatorFactory("a0", "cnt"), + new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128), + new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128), + new DoublesSketchAggregatorFactory("a3:agg", "v0", 128) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator( + "a1", + makeFieldAccessPostAgg("a1:agg"), + 0.5f + ), + new ExpressionPostAggregator( + "p0", + "(\"a1\" + 1)", + null, + TestExprMacroTable.INSTANCE + ), + new DoublesSketchToQuantilePostAggregator( + "p2", + new FieldAccessPostAggregator( + "p1", + "a2:agg" + ), + 0.5f + ), + new ExpressionPostAggregator( + "p3", + "(p2 + 1000)", + null, + TestExprMacroTable.INSTANCE + ), + new DoublesSketchToQuantilePostAggregator( + "p5", + new FieldAccessPostAggregator( + "p4", + "a3:agg" + ), + 0.5f + ), + new ExpressionPostAggregator( + "p6", + "(p5 + 1000)", + null, + TestExprMacroTable.INSTANCE + ), + new DoublesSketchToQuantilePostAggregator( + "p8", + new FieldAccessPostAggregator( + "p7", + "a2:agg" + ), + 0.5f + ), + new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE), + new DoublesSketchToQuantilesPostAggregator( + "p11", + new FieldAccessPostAggregator( + "p10", + "a2:agg" + ), + new double[]{0.5d, 0.8d} + ), + new DoublesSketchToHistogramPostAggregator( + "p13", + new FieldAccessPostAggregator( + "p12", + "a2:agg" + ), + new double[]{0.2d, 0.6d}, + null + ), + new DoublesSketchToRankPostAggregator( + "p15", + new FieldAccessPostAggregator( + "p14", + "a2:agg" + ), + 3.0d + ), + new DoublesSketchToCDFPostAggregator( + "p17", + new FieldAccessPostAggregator( + "p16", + "a2:agg" + ), + new double[]{0.2d, 0.6d} + ), + new DoublesSketchToStringPostAggregator( + "p19", + new FieldAccessPostAggregator( + "p18", + "a2:agg" + ) + ), + new ExpressionPostAggregator( + "p20", + "replace(replace(p19,'HeapCompactDoublesSketch','HeapUpdateDoublesSketch')," + + "'Combined Buffer Capacity : 6'," + + "'Combined Buffer Capacity : 8')", + null, + ExprMacroTable.nil() + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + 6L, + 2.0d, + 1001.0d, + 1124.0d, + 1.0d, + "[1.0,1.0]", + "[0.0,0.0,6.0]", + 1.0d, + "[0.0,0.0,1.0]", + "\n" + + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n" + + " Empty : false\n" + + " Direct, Capacity bytes : false, \n" + + " Estimation Mode : false\n" + + " K : 128\n" + + " N : 6\n" + + " Levels (Needed, Total, Valid): 0, 0, 0\n" + + " Level Bit Pattern : 0\n" + + " BaseBufferCount : 6\n" + + " Combined Buffer Capacity : 8\n" + + " Retained Items : 6\n" + + " Compact Storage Bytes : 80\n" + + " Updatable Storage Bytes : 96\n" + + " Normalized Rank Error : 1.406%\n" + + " Normalized Rank Error (PMF) : 1.711%\n" + + " Min Value : 1.000000e+00\n" + + " Max Value : 1.000000e+00\n" + + "### END SKETCH SUMMARY\n" + } + ) ); - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - - Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); - Query expectedQuery = Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .granularity(Granularities.ALL) - .virtualColumns( - new ExpressionVirtualColumn( - "v0", - "(\"cnt\" + 123)", - ValueType.FLOAT, - TestExprMacroTable.INSTANCE - ) - ) - .aggregators(ImmutableList.of( - new LongSumAggregatorFactory("a0", "cnt"), - new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128), - new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128), - new DoublesSketchAggregatorFactory("a3:agg", "v0", 128) - )) - .postAggregators( - new DoublesSketchToQuantilePostAggregator( - "a1", - makeFieldAccessPostAgg("a1:agg"), - 0.5f - ), - new ExpressionPostAggregator( - "p0", - "(\"a1\" + 1)", - null, - TestExprMacroTable.INSTANCE - ), - new DoublesSketchToQuantilePostAggregator( - "p2", - new FieldAccessPostAggregator( - "p1", - "a2:agg" - ), - 0.5f - ), - new ExpressionPostAggregator( - "p3", - "(p2 + 1000)", - null, - TestExprMacroTable.INSTANCE - ), - new DoublesSketchToQuantilePostAggregator( - "p5", - new FieldAccessPostAggregator( - "p4", - "a3:agg" - ), - 0.5f - ), - new ExpressionPostAggregator( - "p6", - "(p5 + 1000)", - null, - TestExprMacroTable.INSTANCE - ), - new DoublesSketchToQuantilePostAggregator( - "p8", - new FieldAccessPostAggregator( - "p7", - "a2:agg" - ), - 0.5f - ), - new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE), - new DoublesSketchToQuantilesPostAggregator( - "p11", - new FieldAccessPostAggregator( - "p10", - "a2:agg" - ), - new double[]{0.5d, 0.8d} - ), - new DoublesSketchToHistogramPostAggregator( - "p13", - new FieldAccessPostAggregator( - "p12", - "a2:agg" - ), - new double[]{0.2d, 0.6d}, - null - ), - new DoublesSketchToRankPostAggregator( - "p15", - new FieldAccessPostAggregator( - "p14", - "a2:agg" - ), - 3.0d - ), - new DoublesSketchToCDFPostAggregator( - "p17", - new FieldAccessPostAggregator( - "p16", - "a2:agg" - ), - new double[]{0.2d, 0.6d} - ), - new DoublesSketchToStringPostAggregator( - "p19", - new FieldAccessPostAggregator( - "p18", - "a2:agg" - ) - ) - ) - .context(TIMESERIES_CONTEXT_DEFAULT) - .build(); - - // Verify query - Assert.assertEquals(expectedQuery, actualQuery); } @Test public void testDoublesSketchPostAggsPostSort() throws Exception { - SqlLifecycle sqlLifecycle = getSqlLifecycle(); - - final String sql = "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10"; - final String sql2 = StringUtils.format("SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (%s)", sql); - - // Verify results - final List results = sqlLifecycle.runSimple( - sql2, - TIMESERIES_CONTEXT_DEFAULT, - DEFAULT_PARAMETERS, - AUTH_RESULT - ).toList(); - final List expectedResults = ImmutableList.of( - new Object[]{ - 4.0d, - 6.0d - } - ); - - Assert.assertEquals(expectedResults.size(), results.size()); - for (int i = 0; i < expectedResults.size(); i++) { - Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); - } - - Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); - - Query expectedQuery = - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .granularity(Granularities.ALL) - .aggregators( - ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) + testQuery( + "SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (" + + "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10" + + ")", + Collections.singletonList( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) + ) ) - ) - .postAggregators( - ImmutableList.of( - new FieldAccessPostAggregator("p0", "a0:agg"), - new DoublesSketchToQuantilePostAggregator( - "p2", - new FieldAccessPostAggregator("p1", "a0:agg"), - 0.5 - ), - new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5), - new DoublesSketchToQuantilePostAggregator( - "s3", - new FieldAccessPostAggregator("s2", "p0"), - 0.9800000190734863 + .postAggregators( + ImmutableList.of( + new FieldAccessPostAggregator("p0", "a0:agg"), + new DoublesSketchToQuantilePostAggregator( + "p2", + new FieldAccessPostAggregator("p1", "a0:agg"), + 0.5 + ), + new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5), + new DoublesSketchToQuantilePostAggregator( + "s3", + new FieldAccessPostAggregator("s2", "p0"), + 0.9800000190734863 + ) ) ) - ) - .context(TIMESERIES_CONTEXT_DEFAULT) - .build(); - - // Verify query - Assert.assertEquals(expectedQuery, actualQuery); + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + 4.0d, + 6.0d + } + ) + ); } private static PostAggregator makeFieldAccessPostAgg(String name) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 0d6e8b879ef2..1ed990b5f970 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -96,8 +96,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * This class provides general utility to test any druid aggregation implementation given raw data, @@ -116,6 +118,8 @@ public class AggregationTestHelper implements Closeable private final TemporaryFolder tempFolder; private final Closer resourceCloser; + private final Map queryContext; + private AggregationTestHelper( ObjectMapper mapper, IndexMerger indexMerger, @@ -124,7 +128,8 @@ private AggregationTestHelper( QueryRunnerFactory factory, TemporaryFolder tempFolder, List jsonModulesToRegister, - Closer resourceCloser + Closer resourceCloser, + Map queryContext ) { this.mapper = mapper; @@ -134,6 +139,7 @@ private AggregationTestHelper( this.factory = factory; this.tempFolder = tempFolder; this.resourceCloser = resourceCloser; + this.queryContext = queryContext; for (Module mod : jsonModulesToRegister) { mapper.registerModule(mod); @@ -174,7 +180,8 @@ public int columnCacheSizeBytes() factory, tempFolder, jsonModulesToRegister, - closer + closer, + Collections.emptyMap() ); } @@ -213,7 +220,8 @@ public int columnCacheSizeBytes() factory, tempFolder, jsonModulesToRegister, - Closer.create() + Closer.create(), + Collections.emptyMap() ); } @@ -264,7 +272,8 @@ public int columnCacheSizeBytes() factory, tempFolder, jsonModulesToRegister, - resourceCloser + resourceCloser, + Collections.emptyMap() ); } @@ -307,7 +316,25 @@ public int columnCacheSizeBytes() factory, tempFolder, jsonModulesToRegister, - resourceCloser + resourceCloser, + Collections.emptyMap() + ); + } + + public AggregationTestHelper withQueryContext(final Map queryContext) + { + final Map newContext = new HashMap<>(this.queryContext); + newContext.putAll(queryContext); + return new AggregationTestHelper( + mapper, + indexMerger, + indexIO, + toolChest, + factory, + tempFolder, + Collections.emptyList(), + resourceCloser, + newContext ); } @@ -658,7 +685,7 @@ public Segment persistIncrementalIndex( //from each segment, later deserialize and merge and finally return the results public Sequence runQueryOnSegments(final List segmentDirs, final String queryJson) { - return runQueryOnSegments(segmentDirs, readQuery(queryJson)); + return runQueryOnSegments(segmentDirs, readQuery(queryJson).withOverriddenContext(queryContext)); } public Sequence runQueryOnSegments(final List segmentDirs, final Query query) From 8f9e5bcf3fe7fc2656f7eaa4b1ccb0b36b52c91c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 30 Apr 2021 11:06:55 -0700 Subject: [PATCH 2/2] Style fix. --- .../sql/DoublesSketchSqlAggregatorTest.java | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 27dc3880370a..647f300aaa84 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -77,7 +77,6 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest { - private static final AuthenticationResult AUTH_RESULT = CalciteTests.REGULAR_USER_AUTH_RESULT; private static final DruidOperatorTable OPERATOR_TABLE = new DruidOperatorTable( ImmutableSet.of( new DoublesSketchApproxQuantileSqlAggregator(), @@ -683,24 +682,24 @@ public void testDoublesSketchPostAggs() throws Exception 1.0d, "[0.0,0.0,1.0]", "\n" - + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n" - + " Empty : false\n" - + " Direct, Capacity bytes : false, \n" - + " Estimation Mode : false\n" - + " K : 128\n" - + " N : 6\n" - + " Levels (Needed, Total, Valid): 0, 0, 0\n" - + " Level Bit Pattern : 0\n" - + " BaseBufferCount : 6\n" - + " Combined Buffer Capacity : 8\n" - + " Retained Items : 6\n" - + " Compact Storage Bytes : 80\n" - + " Updatable Storage Bytes : 96\n" - + " Normalized Rank Error : 1.406%\n" - + " Normalized Rank Error (PMF) : 1.711%\n" - + " Min Value : 1.000000e+00\n" - + " Max Value : 1.000000e+00\n" - + "### END SKETCH SUMMARY\n" + + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n" + + " Empty : false\n" + + " Direct, Capacity bytes : false, \n" + + " Estimation Mode : false\n" + + " K : 128\n" + + " N : 6\n" + + " Levels (Needed, Total, Valid): 0, 0, 0\n" + + " Level Bit Pattern : 0\n" + + " BaseBufferCount : 6\n" + + " Combined Buffer Capacity : 8\n" + + " Retained Items : 6\n" + + " Compact Storage Bytes : 80\n" + + " Updatable Storage Bytes : 96\n" + + " Normalized Rank Error : 1.406%\n" + + " Normalized Rank Error (PMF) : 1.711%\n" + + " Min Value : 1.000000e+00\n" + + " Max Value : 1.000000e+00\n" + + "### END SKETCH SUMMARY\n" } ) ); @@ -731,7 +730,11 @@ public void testDoublesSketchPostAggsPostSort() throws Exception new FieldAccessPostAggregator("p1", "a0:agg"), 0.5 ), - new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5), + new DoublesSketchToQuantilePostAggregator( + "s1", + new FieldAccessPostAggregator("s0", "p0"), + 0.5 + ), new DoublesSketchToQuantilePostAggregator( "s3", new FieldAccessPostAggregator("s2", "p0"),