From f2d58ce3d527536ceac4ef0630f00aff3a1c3f91 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 14 Apr 2021 14:34:23 -0700 Subject: [PATCH 1/3] Vectorized versions of HllSketch aggregators. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The patch uses the sameĀ "helper" approach as #10767 and #10304, and extends the tests to run in both vectorized and non-vectorized modes. Also includes some minor changes to the theta sketch vector aggregator: - Cosmetic changes to make the hll and theta implementations look more similar. - Extends the theta SQL tests to run in vectorized mode. --- .../hll/HllSketchBuildAggregatorFactory.java | 21 +++ .../hll/HllSketchBuildBufferAggregator.java | 115 +------------ .../HllSketchBuildBufferAggregatorHelper.java | 156 ++++++++++++++++++ .../hll/HllSketchBuildVectorAggregator.java | 127 ++++++++++++++ .../hll/HllSketchMergeAggregatorFactory.java | 21 +++ .../hll/HllSketchMergeBufferAggregator.java | 68 ++------ .../HllSketchMergeBufferAggregatorHelper.java | 115 +++++++++++++ .../hll/HllSketchMergeVectorAggregator.java | 130 +++++++++++++++ .../datasketches/hll/StripedBufferLock.java | 58 +++++++ .../theta/SketchVectorAggregator.java | 18 +- .../hll/HllSketchAggregatorTest.java | 44 ++++- .../hll/sql/HllSketchSqlAggregatorTest.java | 95 ++++++++--- .../sql/ThetaSketchSqlAggregatorTest.java | 124 ++++++++++---- 13 files changed, 856 insertions(+), 236 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 8abc305304ed..df68180b7bf1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -26,8 +26,11 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -81,6 +84,24 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSele ); } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return new HllSketchBuildVectorAggregator( + selectorFactory, + getFieldName(), + getLgK(), + TgtHllType.valueOf(getTgtHllType()), + getMaxIntermediateSize() + ); + } + /** * For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases. * The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator. diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 4c39259a6a97..7c06c3f557a0 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -19,22 +19,13 @@ package org.apache.druid.query.aggregation.datasketches.hll; -import com.google.common.util.concurrent.Striped; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; -import org.apache.datasketches.hll.Union; -import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator builds sketches from raw data. @@ -42,26 +33,8 @@ */ public class HllSketchBuildBufferAggregator implements BufferAggregator { - - /** - * for locking per buffer position (power of 2 to make index computation faster) - */ - private static final int NUM_STRIPES = 64; - private final ColumnValueSelector selector; - private final int lgK; - private final TgtHllType tgtHllType; - private final int size; - private final IdentityHashMap memCache = new IdentityHashMap<>(); - private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); - - /** - * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. - * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The - * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. - */ - private final byte[] emptySketch; + private final HllSketchBuildBufferAggregatorHelper helper; public HllSketchBuildBufferAggregator( final ColumnValueSelector selector, @@ -71,32 +44,13 @@ public HllSketchBuildBufferAggregator( ) { this.selector = selector; - this.lgK = lgK; - this.tgtHllType = tgtHllType; - this.size = size; - this.emptySketch = new byte[size]; - - //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction) - new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch)); + this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); } @Override public void init(final ByteBuffer buf, final int position) { - // Copy prebuilt empty sketch object. - - final int oldPosition = buf.position(); - try { - buf.position(position); - buf.put(emptySketch); - } - finally { - buf.position(oldPosition); - } - - // Add an HllSketch for this chunk to our sketchCache. - final WritableMemory mem = getMemory(buf).writableRegion(position, size); - putSketchIntoCache(buf, position, HllSketch.writableWrap(mem)); + helper.init(buf, position); } /** @@ -111,11 +65,10 @@ public void aggregate(final ByteBuffer buf, final int position) if (value == null) { return; } - final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock(); + final Lock lock = helper.getLockForPosition(position).writeLock(); lock.lock(); try { - final HllSketch sketch = sketchCache.get(buf).get(position); - HllSketchBuildAggregator.updateSketch(sketch, value); + helper.updateSketchAtPosition(buf, position, value); } finally { lock.unlock(); @@ -130,21 +83,13 @@ public void aggregate(final ByteBuffer buf, final int position) @Override public Object get(final ByteBuffer buf, final int position) { - final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); - lock.lock(); - try { - return sketchCache.get(buf).get(position).copy(); - } - finally { - lock.unlock(); - } + return helper.get(buf, position); } @Override public void close() { - memCache.clear(); - sketchCache.clear(); + helper.clear(); } @Override @@ -159,11 +104,6 @@ public long getLong(final ByteBuffer buf, final int position) throw new UnsupportedOperationException("Not implemented"); } - private WritableMemory getMemory(final ByteBuffer buf) - { - return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN)); - } - /** * In very rare cases sketches can exceed given memory, request on-heap memory and move there. * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions. @@ -171,44 +111,7 @@ private WritableMemory getMemory(final ByteBuffer buf) @Override public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) { - HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition); - final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size); - if (sketch.isSameResource(oldMem)) { // sketch has not moved - final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size); - sketch = HllSketch.writableWrap(newMem); - } - putSketchIntoCache(newBuf, newPosition, sketch); - } - - private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch) - { - final Int2ObjectMap map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>()); - map.put(position, sketch); - } - - /** - * compute lock index to avoid boxing in Striped.get() call - * - * @param position - * - * @return index - */ - static int lockIndex(final int position) - { - return smear(position) % NUM_STRIPES; - } - - /** - * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 - * - * @param hashCode - * - * @return smeared hashCode - */ - private static int smear(int hashCode) - { - hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); - return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); + helper.relocate(oldPosition, newPosition, oldBuf, newBuf); } @Override @@ -218,6 +121,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) // lgK should be inspected because different execution paths exist in HllSketch.update() that is called from // @CalledFromHotLoop-annotated aggregate() depending on the lgK. // See https://github.com/apache/druid/pull/6893#discussion_r250726028 - inspector.visit("lgK", lgK); + inspector.visit("lgK", helper.getLgK()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java new file mode 100644 index 000000000000..e2334339b647 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.memory.WritableMemory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +public class HllSketchBuildBufferAggregatorHelper +{ + private final int lgK; + private final int size; + private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); + private final StripedBufferLock stripedLock = new StripedBufferLock(); + + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. + * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link org.apache.datasketches.hll.Union} objects. + */ + private final byte[] emptySketch; + + public HllSketchBuildBufferAggregatorHelper(final int lgK, final TgtHllType tgtHllType, final int size) + { + this.lgK = lgK; + this.size = size; + this.emptySketch = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction) + new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(final ByteBuffer buf, final int position) + { + // Copy prebuilt empty sketch object. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptySketch); + } + finally { + buf.position(oldPosition); + } + + // Add an HllSketch for this chunk to our sketchCache. + final WritableMemory mem = getMemory(buf).writableRegion(position, size); + putSketchIntoCache(buf, position, HllSketch.writableWrap(mem)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + */ + public Object get(ByteBuffer buf, int position) + { + final Lock lock = stripedLock.getLock(position).readLock(); + lock.lock(); + try { + return sketchCache.get(buf).get(position).copy(); + } + finally { + lock.unlock(); + } + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}. + */ + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBuffer newBuf) + { + HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition); + final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size); + if (sketch.isSameResource(oldMem)) { // sketch has not moved + final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size); + sketch = HllSketch.writableWrap(newMem); + } + putSketchIntoCache(newBuf, newPosition, sketch); + } + + public ReadWriteLock getLockForPosition(final int position) + { + return stripedLock.getLock(position); + } + + /** + * Updates the sketch at a particular position. Must be called while holding a write lock from + * {@link #getLockForPosition} for the given position. + * + * Does nothing if "o" is null. + */ + public void updateSketchAtPosition(final ByteBuffer buf, final int position, @Nullable final Object o) + { + if (o != null) { + final HllSketch sketch = sketchCache.get(buf).get(position); + HllSketchBuildAggregator.updateSketch(sketch, o); + } + } + + /** + * Clean up resources used by this helper. + */ + public void clear() + { + memCache.clear(); + sketchCache.clear(); + } + + public int getLgK() + { + return lgK; + } + + private WritableMemory getMemory(final ByteBuffer buf) + { + return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN)); + } + + private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch) + { + final Int2ObjectMap map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>()); + map.put(position, sketch); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java new file mode 100644 index 000000000000..f1e33cba1fa3 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +public class HllSketchBuildVectorAggregator implements VectorAggregator +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final Supplier objectSupplier; + + HllSketchBuildVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int lgK, + final TgtHllType tgtHllType, + final int size + ) + { + this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); + this.objectSupplier = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Object[] vector = objectSupplier.get(); + final Lock lock = helper.getLockForPosition(position).writeLock(); + lock.lock(); + try { + for (int i = startRow; i < endRow; i++) { + helper.updateSketchAtPosition(buf, position, vector[i]); + } + } + finally { + lock.unlock(); + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = objectSupplier.get(); + + for (int i = 0; i < numRows; i++) { + final Object o = vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + + final Lock lock = helper.getLockForPosition(position).writeLock(); + lock.lock(); + try { + helper.updateSketchAtPosition(buf, position, o); + } + finally { + lock.unlock(); + } + } + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return helper.get(buf, position); + } + + /** + * In very rare cases sketches can exceed given memory, request on-heap memory and move there. + * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions. + */ + @Override + public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) + { + helper.relocate(oldPosition, newPosition, oldBuf, newBuf); + } + + @Override + public void close() + { + helper.clear(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 74afea31bfea..050cf59e1fcf 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -29,8 +29,11 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -102,6 +105,24 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSele ); } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return new HllSketchMergeVectorAggregator( + selectorFactory, + getFieldName(), + getLgK(), + TgtHllType.valueOf(getTgtHllType()), + getMaxIntermediateSize() + ); + } + @Override public int getMaxIntermediateSize() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 7161c25fb6bc..7626cb88aaf2 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -19,7 +19,6 @@ package org.apache.druid.query.aggregation.datasketches.hll; -import com.google.common.util.concurrent.Striped; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; @@ -31,7 +30,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator merges existing sketches. @@ -39,24 +37,8 @@ */ public class HllSketchMergeBufferAggregator implements BufferAggregator { - - /** - * for locking per buffer position (power of 2 to make index computation faster) - */ - private static final int NUM_STRIPES = 64; - private final ColumnValueSelector selector; - private final int lgK; - private final TgtHllType tgtHllType; - private final int size; - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); - - /** - * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. - * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The - * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. - */ - private final byte[] emptyUnion; + private final HllSketchMergeBufferAggregatorHelper helper; public HllSketchMergeBufferAggregator( final ColumnValueSelector selector, @@ -66,32 +48,13 @@ public HllSketchMergeBufferAggregator( ) { this.selector = selector; - this.lgK = lgK; - this.tgtHllType = tgtHllType; - this.size = size; - this.emptyUnion = new byte[size]; - - //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction) - new Union(lgK, WritableMemory.wrap(emptyUnion)); + this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size); } @Override public void init(final ByteBuffer buf, final int position) { - // Copy prebuilt empty union object. - // Not necessary to cache a Union wrapper around the initialized memory, because: - // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". - // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the - // max size and therefore do not need to be potentially moved in-heap. - - final int oldPosition = buf.position(); - try { - buf.position(position); - buf.put(emptyUnion); - } - finally { - buf.position(oldPosition); - } + helper.init(buf, position); } /** @@ -106,8 +69,11 @@ public void aggregate(final ByteBuffer buf, final int position) if (sketch == null) { return; } - final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock(); + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Lock lock = helper.getLockForPosition(position).writeLock(); lock.lock(); try { final Union union = Union.writableWrap(mem); @@ -118,24 +84,10 @@ public void aggregate(final ByteBuffer buf, final int position) } } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public Object get(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - return union.getResult(tgtHllType); - } - finally { - lock.unlock(); - } + return helper.get(buf, position); } @Override @@ -163,6 +115,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) // lgK should be inspected because different execution paths exist in Union.update() that is called from // @CalledFromHotLoop-annotated aggregate() depending on the lgK. // See https://github.com/apache/druid/pull/6893#discussion_r250726028 - inspector.visit("lgK", lgK); + inspector.visit("lgK", helper.getLgK()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java new file mode 100644 index 000000000000..37604b118564 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.hll.Union; +import org.apache.datasketches.memory.WritableMemory; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +public class HllSketchMergeBufferAggregatorHelper +{ + private final int lgK; + private final TgtHllType tgtHllType; + private final int size; + private final StripedBufferLock stripedLock = new StripedBufferLock(); + + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. + * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. + */ + private final byte[] emptyUnion; + + public HllSketchMergeBufferAggregatorHelper(int lgK, TgtHllType tgtHllType, int size) + { + this.lgK = lgK; + this.tgtHllType = tgtHllType; + this.size = size; + this.emptyUnion = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction) + new Union(lgK, WritableMemory.wrap(emptyUnion)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(final ByteBuffer buf, final int position) + { + // Copy prebuilt empty union object. + // Not necessary to cache a Union wrapper around the initialized memory, because: + // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". + // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the + // max size and therefore do not need to be potentially moved in-heap. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptyUnion); + } + finally { + buf.position(oldPosition); + } + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + * + * This method uses locks because it can be used during indexing, + * and Druid can call aggregate() and get() concurrently + * See https://github.com/druid-io/druid/pull/3956 + */ + public Object get(ByteBuffer buf, int position) + { + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); + final Lock lock = stripedLock.getLock(position).readLock(); + lock.lock(); + try { + final Union union = Union.writableWrap(mem); + return union.getResult(tgtHllType); + } + finally { + lock.unlock(); + } + } + + public ReadWriteLock getLockForPosition(final int position) + { + return stripedLock.getLock(position); + } + + public int getLgK() + { + return lgK; + } + + public int getSize() + { + return size; + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java new file mode 100644 index 000000000000..23b485894443 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.hll.Union; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +public class HllSketchMergeVectorAggregator implements VectorAggregator +{ + private final HllSketchMergeBufferAggregatorHelper helper; + private final Supplier objectSupplier; + + HllSketchMergeVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int lgK, + final TgtHllType tgtHllType, + final int size + ) + { + this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size); + this.objectSupplier = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Object[] vector = objectSupplier.get(); + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Lock lock = helper.getLockForPosition(position).writeLock(); + lock.lock(); + try { + final Union union = Union.writableWrap(mem); + for (int i = startRow; i < endRow; i++) { + union.update((HllSketch) vector[i]); + } + } + finally { + lock.unlock(); + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = objectSupplier.get(); + + for (int i = 0; i < numRows; i++) { + final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Lock lock = helper.getLockForPosition(position).writeLock(); + lock.lock(); + try { + final Union union = Union.writableWrap(mem); + union.update(o); + } + finally { + lock.unlock(); + } + } + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return helper.get(buf, position); + } + + @Override + public void close() + { + // Nothing to close. + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java new file mode 100644 index 000000000000..0c9e27d8b130 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import com.google.common.util.concurrent.Striped; + +import java.util.concurrent.locks.ReadWriteLock; + +/** + * Utility for locking positions in a buffer. + */ +public class StripedBufferLock +{ + /** + * for locking per buffer position (power of 2 to make index computation faster) + */ + private static final int NUM_STRIPES = 64; + + private final Striped lock = Striped.readWriteLock(NUM_STRIPES); + + /** + * Get the lock corresponding to a particular position. + */ + public ReadWriteLock getLock(final int position) + { + // Compute lock index to avoid boxing in Striped.get() call + final int lockIndex = smear(position) % NUM_STRIPES; + return lock.getAt(lockIndex); + } + + /** + * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 + * + * @return smeared value + */ + private static int smear(int x) + { + x ^= (x >>> 20) ^ (x >>> 12); + return x ^ (x >>> 7) ^ (x >>> 4); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index b5b9ad842cf1..a862265d561c 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -31,18 +31,18 @@ public class SketchVectorAggregator implements VectorAggregator { - private final Supplier toObjectProcessor; private final SketchBufferAggregatorHelper helper; + private final Supplier objectSupplier; - public SketchVectorAggregator( - VectorColumnSelectorFactory columnSelectorFactory, - String column, - int size, - int maxIntermediateSize + SketchVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int size, + final int maxIntermediateSize ) { this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); - this.toObjectProcessor = + this.objectSupplier = ColumnProcessors.makeVectorProcessor( column, ToObjectVectorColumnProcessorFactory.INSTANCE, @@ -60,7 +60,7 @@ public void init(final ByteBuffer buf, final int position) public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) { final Union union = helper.getOrCreateUnion(buf, position); - final Object[] vector = toObjectProcessor.get(); + final Object[] vector = objectSupplier.get(); for (int i = startRow; i < endRow; i++) { final Object o = vector[i]; @@ -79,7 +79,7 @@ public void aggregate( final int positionOffset ) { - final Object[] vector = toObjectProcessor.get(); + final Object[] vector = objectSupplier.get(); for (int i = 0; i < numRows; i++) { final Object o = vector[rows != null ? rows[i] : i]; diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index a299257fc712..afac5739db9e 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.groupby.GroupByQuery; @@ -54,23 +55,27 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest private static final boolean ROUND = true; private final AggregationTestHelper helper; + private final QueryContexts.Vectorize vectorize; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - public HllSketchAggregatorTest(GroupByQueryConfig config) + public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize) { HllSketchModule.registerSerde(); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( new HllSketchModule().getJacksonModules(), config, tempFolder); + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -224,10 +229,32 @@ public void testPostAggs() throws Exception ) .setPostAggregatorSpecs( ImmutableList.of( - new HllSketchToEstimatePostAggregator("estimate", new FieldAccessPostAggregator("f1", "sketch"), false), - new HllSketchToEstimateWithBoundsPostAggregator("estimateWithBounds", new FieldAccessPostAggregator("f1", "sketch"), 2), - new HllSketchToStringPostAggregator("summary", new FieldAccessPostAggregator("f1", "sketch")), - new HllSketchUnionPostAggregator("union", ImmutableList.of(new FieldAccessPostAggregator("f1", "sketch"), new FieldAccessPostAggregator("f2", "sketch")), null, null) + new HllSketchToEstimatePostAggregator( + "estimate", + new FieldAccessPostAggregator("f1", "sketch"), + false + ), + new HllSketchToEstimateWithBoundsPostAggregator( + "estimateWithBounds", + new FieldAccessPostAggregator( + "f1", + "sketch" + ), + 2 + ), + new HllSketchToStringPostAggregator( + "summary", + new FieldAccessPostAggregator("f1", "sketch") + ), + new HllSketchUnionPostAggregator( + "union", + ImmutableList.of(new FieldAccessPostAggregator( + "f1", + "sketch" + ), new FieldAccessPostAggregator("f2", "sketch")), + null, + null + ) ) ) .build() @@ -320,7 +347,7 @@ private static Map buildAggregatorObject( ); } - private static String buildGroupByQueryJson( + private String buildGroupByQueryJson( String aggregationType, String aggregationFieldName, boolean aggregationRound @@ -338,6 +365,7 @@ private static String buildGroupByQueryJson( .put("dimensions", Collections.emptyList()) .put("aggregations", Collections.singletonList(aggregation)) .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z")) + .put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString())) .build(); return toJson(object); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 039801c5efc6..3412f3309a98 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -84,34 +85,59 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +@RunWith(Parameterized.class) public class HllSketchSqlAggregatorTest extends CalciteTestBase { private static final String DATA_SOURCE = "foo"; private static final boolean ROUND = true; - private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - ); private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER); - + + private final Map queryContext; private SpecificSegmentsQuerySegmentWalker walker; private SqlLifecycleFactory sqlLifecycleFactory; + public HllSketchSqlAggregatorTest(final String vectorize) + { + this.queryContext = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy", + QueryContexts.VECTORIZE_KEY, vectorize + ); + } + + @Parameterized.Parameters(name = "vectorize = {0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{vectorize}); + } + return constructors; + } + @BeforeClass public static void setUpClass() { @@ -207,6 +233,9 @@ public void tearDown() throws Exception @Test public void testApproxCountDistinctHllSketch() throws Exception { + // Can't vectorize due to CONCAT expression. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -222,7 +251,7 @@ public void testApproxCountDistinctHllSketch() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -317,8 +346,9 @@ public void testApproxCountDistinctHllSketch() throws Exception new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND) ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(), + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); } @@ -327,6 +357,9 @@ public void testApproxCountDistinctHllSketch() throws Exception @Test public void testAvgDailyCountDistinctHllSketch() throws Exception { + // Can't vectorize due to outer query, which runs on an inline datasource. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -340,7 +373,7 @@ public void testAvgDailyCountDistinctHllSketch() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -379,11 +412,14 @@ public void testAvgDailyCountDistinctHllSketch() throws Exception new FinalizingFieldAccessPostAggregator("a0", "a0:a") ) ) - .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( - ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), - "d0" - )) + .context(queryContext) .build() + .withOverriddenContext( + BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( + ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), + "d0" + ) + ) ) ) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) @@ -414,7 +450,7 @@ public void testAvgDailyCountDistinctHllSketch() throws Exception ) ) ) - .setContext(QUERY_CONTEXT_DEFAULT) + .setContext(queryContext) .build(); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); @@ -437,7 +473,7 @@ public void testApproxCountDistinctHllSketchIsRounded() throws Exception // Verify results final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + sqlLifecycle.runSimple(sql, queryContext, DEFAULT_PARAMETERS, authenticationResult).toList(); final int expected = NullHandling.replaceWithDefault() ? 1 : 2; Assert.assertEquals(expected, results.size()); } @@ -445,6 +481,9 @@ public void testApproxCountDistinctHllSketchIsRounded() throws Exception @Test public void testHllSketchPostAggs() throws Exception { + // Can't vectorize due to CONCAT expression. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -466,7 +505,7 @@ public void testHllSketchPostAggs() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -598,11 +637,9 @@ public void testHllSketchPostAggs() throws Exception new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true) ) ) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - )) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); @@ -619,7 +656,7 @@ public void testtHllSketchPostAggsPostSort() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql2, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -670,13 +707,19 @@ public void testtHllSketchPostAggsPostSort() throws Exception new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0")) ) ) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - )) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); } + + private void cannotVectorize() + { + if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY)) + == QueryContexts.Vectorize.FORCE) { + expectedException.expectMessage("Cannot vectorize"); + } + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java index 9201c91b994f..2710526f09fd 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -81,14 +82,20 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +@RunWith(Parameterized.class) public class ThetaSketchSqlAggregatorTest extends CalciteTestBase { private static final String DATA_SOURCE = "foo"; @@ -96,9 +103,6 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; - private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - ); @BeforeClass public static void setUpClass() @@ -113,15 +117,37 @@ public static void tearDownClass() throws IOException resourceCloser.close(); } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public QueryLogHook queryLogHook = QueryLogHook.create(); + private final Map queryContext; private SpecificSegmentsQuerySegmentWalker walker; private SqlLifecycleFactory sqlLifecycleFactory; + public ThetaSketchSqlAggregatorTest(final String vectorize) + { + this.queryContext = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy", + QueryContexts.VECTORIZE_KEY, vectorize + ); + } + + @Parameterized.Parameters(name = "vectorize = {0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{vectorize}); + } + return constructors; + } + @Before public void setUp() throws Exception { @@ -206,21 +232,30 @@ public void tearDown() throws Exception @Test public void testApproxCountDistinctThetaSketch() throws Exception { + // Cannot vectorize due to string expressions. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" + " SUM(cnt),\n" - + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" // uppercase - + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered - + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn - + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression - + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" // on native theta sketch column - + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" // on native theta sketch column + + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" + // uppercase + + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" + // lowercase; also, filtered + + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" + // on extractionFn + + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" + // on expression + + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" + // on native theta sketch column + + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" + // on native theta sketch column + "FROM druid.foo"; // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -319,8 +354,9 @@ public void testApproxCountDistinctThetaSketch() throws Exception new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null) ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(), + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); } @@ -328,6 +364,9 @@ public void testApproxCountDistinctThetaSketch() throws Exception @Test public void testAvgDailyCountDistinctThetaSketch() throws Exception { + // Can't vectorize due to outer query (it operates on an inlined data source, which cannot be vectorized). + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -337,7 +376,7 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -358,7 +397,11 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of( Filtration.eternity() ))) - .granularity(new PeriodGranularity(Period.days(1), null, DateTimeZone.UTC)) + .granularity(new PeriodGranularity( + Period.days(1), + null, + DateTimeZone.UTC + )) .aggregators( Collections.singletonList( new SketchMergeAggregatorFactory( @@ -373,14 +416,25 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception ) .postAggregators( ImmutableList.of( - new FinalizingFieldAccessPostAggregator("a0", "a0:a") + new FinalizingFieldAccessPostAggregator( + "a0", + "a0:a" + ) ) ) - .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( - ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), - "d0" - )) + .context(queryContext) .build() + .withOverriddenContext( + BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( + ImmutableMap.of( + "skipEmptyBuckets", + true, + "sqlQueryId", + "dummy" + ), + "d0" + ) + ) ) ) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) @@ -388,8 +442,8 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception .setAggregatorSpecs( NullHandling.replaceWithDefault() ? Arrays.asList( - new LongSumAggregatorFactory("_a0:sum", "a0"), - new CountAggregatorFactory("_a0:count") + new LongSumAggregatorFactory("_a0:sum", "a0"), + new CountAggregatorFactory("_a0:count") ) : Arrays.asList( new LongSumAggregatorFactory("_a0:sum", "a0"), @@ -411,7 +465,7 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception ) ) ) - .setContext(QUERY_CONTEXT_DEFAULT) + .setContext(queryContext) .build(); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); @@ -423,6 +477,9 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception @Test public void testThetaSketchPostAggs() throws Exception { + // Can't vectorize due to CONCAT expression. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" + " SUM(cnt),\n" @@ -439,7 +496,7 @@ public void testThetaSketchPostAggs() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -598,8 +655,9 @@ public void testThetaSketchPostAggs() throws Exception null ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query @@ -617,7 +675,7 @@ public void testThetaSketchPostAggsPostSort() throws Exception // Verify results final List results = sqlLifecycle.runSimple( sql2, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -664,11 +722,19 @@ public void testThetaSketchPostAggsPostSort() throws Exception null ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(); - + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); } + + private void cannotVectorize() + { + if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY)) + == QueryContexts.Vectorize.FORCE) { + expectedException.expectMessage("Cannot vectorize"); + } + } } From e495630212e14dd8d0106b90cabab74d687c4649 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 16 Apr 2021 12:36:52 -0700 Subject: [PATCH 2/3] Updates post-code-review. --- .../hll/HllSketchBuildBufferAggregator.java | 21 +------ .../HllSketchBuildBufferAggregatorHelper.java | 28 ++------- .../hll/HllSketchBuildVectorAggregator.java | 23 ++------ .../hll/HllSketchMergeBufferAggregator.java | 17 +----- .../HllSketchMergeBufferAggregatorHelper.java | 23 +------- .../hll/HllSketchMergeVectorAggregator.java | 25 ++------ .../datasketches/hll/StripedBufferLock.java | 58 ------------------- .../hll/sql/HllSketchSqlAggregatorTest.java | 6 +- .../sql/ThetaSketchSqlAggregatorTest.java | 8 +-- 9 files changed, 25 insertions(+), 184 deletions(-) delete mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 7c06c3f557a0..ab54215e52f2 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -25,7 +25,6 @@ import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -import java.util.concurrent.locks.Lock; /** * This aggregator builds sketches from raw data. @@ -53,11 +52,6 @@ public void init(final ByteBuffer buf, final int position) helper.init(buf, position); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -65,21 +59,10 @@ public void aggregate(final ByteBuffer buf, final int position) if (value == null) { return; } - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - helper.updateSketchAtPosition(buf, position, value); - } - finally { - lock.unlock(); - } + + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public Object get(final ByteBuffer buf, final int position) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java index e2334339b647..71c388e24f62 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java @@ -25,12 +25,9 @@ import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.memory.WritableMemory; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.IdentityHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; public class HllSketchBuildBufferAggregatorHelper { @@ -38,7 +35,6 @@ public class HllSketchBuildBufferAggregatorHelper private final int size; private final IdentityHashMap memCache = new IdentityHashMap<>(); private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); - private final StripedBufferLock stripedLock = new StripedBufferLock(); /** * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. @@ -85,14 +81,7 @@ public void init(final ByteBuffer buf, final int position) */ public Object get(ByteBuffer buf, int position) { - final Lock lock = stripedLock.getLock(position).readLock(); - lock.lock(); - try { - return sketchCache.get(buf).get(position).copy(); - } - finally { - lock.unlock(); - } + return sketchCache.get(buf).get(position).copy(); } /** @@ -110,23 +99,14 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBu putSketchIntoCache(newBuf, newPosition, sketch); } - public ReadWriteLock getLockForPosition(final int position) - { - return stripedLock.getLock(position); - } - /** - * Updates the sketch at a particular position. Must be called while holding a write lock from - * {@link #getLockForPosition} for the given position. + * Updates the sketch at a particular position. * * Does nothing if "o" is null. */ - public void updateSketchAtPosition(final ByteBuffer buf, final int position, @Nullable final Object o) + public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position) { - if (o != null) { - final HllSketch sketch = sketchCache.get(buf).get(position); - HllSketchBuildAggregator.updateSketch(sketch, o); - } + return sketchCache.get(buf).get(position); } /** diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java index f1e33cba1fa3..506c9c3a2a73 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java @@ -27,7 +27,6 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.concurrent.locks.Lock; import java.util.function.Supplier; public class HllSketchBuildVectorAggregator implements VectorAggregator @@ -62,16 +61,12 @@ public void init(final ByteBuffer buf, final int position) public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) { final Object[] vector = objectSupplier.get(); - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - for (int i = startRow; i < endRow; i++) { - helper.updateSketchAtPosition(buf, position, vector[i]); + for (int i = startRow; i < endRow; i++) { + final Object value = vector[i]; + if (value != null) { + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); } } - finally { - lock.unlock(); - } } @Override @@ -90,15 +85,7 @@ public void aggregate( if (o != null) { final int position = positions[i] + positionOffset; - - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - helper.updateSketchAtPosition(buf, position, o); - } - finally { - lock.unlock(); - } + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o); } } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 7626cb88aaf2..278a5c53be60 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.locks.Lock; /** * This aggregator merges existing sketches. @@ -57,11 +56,6 @@ public void init(final ByteBuffer buf, final int position) helper.init(buf, position); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -73,15 +67,8 @@ public void aggregate(final ByteBuffer buf, final int position) final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) .writableRegion(position, helper.getSize()); - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - union.update(sketch); - } - finally { - lock.unlock(); - } + final Union union = Union.writableWrap(mem); + union.update(sketch); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java index 37604b118564..d6030e88501e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -26,15 +26,12 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; public class HllSketchMergeBufferAggregatorHelper { private final int lgK; private final TgtHllType tgtHllType; private final int size; - private final StripedBufferLock stripedLock = new StripedBufferLock(); /** * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. @@ -79,28 +76,12 @@ public void init(final ByteBuffer buf, final int position) /** * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. - * - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 */ public Object get(ByteBuffer buf, int position) { final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - final Lock lock = stripedLock.getLock(position).readLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - return union.getResult(tgtHllType); - } - finally { - lock.unlock(); - } - } - - public ReadWriteLock getLockForPosition(final int position) - { - return stripedLock.getLock(position); + final Union union = Union.writableWrap(mem); + return union.getResult(tgtHllType); } public int getLgK() diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java index 23b485894443..d97c0b5ce673 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -31,7 +31,6 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.locks.Lock; import java.util.function.Supplier; public class HllSketchMergeVectorAggregator implements VectorAggregator @@ -70,16 +69,9 @@ public void aggregate(final ByteBuffer buf, final int position, final int startR final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) .writableRegion(position, helper.getSize()); - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - for (int i = startRow; i < endRow; i++) { - union.update((HllSketch) vector[i]); - } - } - finally { - lock.unlock(); + final Union union = Union.writableWrap(mem); + for (int i = startRow; i < endRow; i++) { + union.update((HllSketch) vector[i]); } } @@ -103,15 +95,8 @@ public void aggregate( final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) .writableRegion(position, helper.getSize()); - final Lock lock = helper.getLockForPosition(position).writeLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - union.update(o); - } - finally { - lock.unlock(); - } + final Union union = Union.writableWrap(mem); + union.update(o); } } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java deleted file mode 100644 index 0c9e27d8b130..000000000000 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/StripedBufferLock.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.datasketches.hll; - -import com.google.common.util.concurrent.Striped; - -import java.util.concurrent.locks.ReadWriteLock; - -/** - * Utility for locking positions in a buffer. - */ -public class StripedBufferLock -{ - /** - * for locking per buffer position (power of 2 to make index computation faster) - */ - private static final int NUM_STRIPES = 64; - - private final Striped lock = Striped.readWriteLock(NUM_STRIPES); - - /** - * Get the lock corresponding to a particular position. - */ - public ReadWriteLock getLock(final int position) - { - // Compute lock index to avoid boxing in Striped.get() call - final int lockIndex = smear(position) % NUM_STRIPES; - return lock.getAt(lockIndex); - } - - /** - * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 - * - * @return smeared value - */ - private static int smear(int x) - { - x ^= (x >>> 20) ^ (x >>> 12); - return x ^ (x >>> 7) ^ (x >>> 4); - } -} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 3412f3309a98..b1faed1cbede 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -124,7 +124,8 @@ public HllSketchSqlAggregatorTest(final String vectorize) { this.queryContext = ImmutableMap.of( PlannerContext.CTX_SQL_QUERY_ID, "dummy", - QueryContexts.VECTORIZE_KEY, vectorize + QueryContexts.VECTORIZE_KEY, vectorize, + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ); } @@ -481,9 +482,6 @@ public void testApproxCountDistinctHllSketchIsRounded() throws Exception @Test public void testHllSketchPostAggs() throws Exception { - // Can't vectorize due to CONCAT expression. - cannotVectorize(); - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java index 2710526f09fd..0aac2b088cc5 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java @@ -134,7 +134,8 @@ public ThetaSketchSqlAggregatorTest(final String vectorize) { this.queryContext = ImmutableMap.of( PlannerContext.CTX_SQL_QUERY_ID, "dummy", - QueryContexts.VECTORIZE_KEY, vectorize + QueryContexts.VECTORIZE_KEY, vectorize, + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ); } @@ -232,7 +233,7 @@ public void tearDown() throws Exception @Test public void testApproxCountDistinctThetaSketch() throws Exception { - // Cannot vectorize due to string expressions. + // Cannot vectorize due to SUBSTRING. cannotVectorize(); SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); @@ -477,9 +478,6 @@ public void testAvgDailyCountDistinctThetaSketch() throws Exception @Test public void testThetaSketchPostAggs() throws Exception { - // Can't vectorize due to CONCAT expression. - cannotVectorize(); - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" + " SUM(cnt),\n" From ac8ec1c29a0f119f4a95e7a1224be562c947d0ea Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 16 Apr 2021 12:42:46 -0700 Subject: [PATCH 3/3] Fix javadoc. --- .../hll/HllSketchBuildBufferAggregatorHelper.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java index 71c388e24f62..466f1aca854f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java @@ -100,9 +100,7 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBu } /** - * Updates the sketch at a particular position. - * - * Does nothing if "o" is null. + * Retrieves the sketch at a particular position. */ public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position) {