From 80d0a2072003ccbea675ca10b47bab1746d3fa7f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Jan 2021 22:56:46 -0800 Subject: [PATCH 1/7] Vectorized theta sketch aggregator. Also a refactoring of BufferAggregator and VectorAggregator such that they share a common interface, BaseBufferAggregator. This allows implementing both in the same file with an abstract + dual subclass structure. --- .../theta/SketchAggregatorFactory.java | 19 ++- .../theta/SketchBufferAggregator.java | 141 +++++++++++++----- .../theta/SketchAggregationTest.java | 55 ++++--- .../SketchAggregationWithSimpleDataTest.java | 40 +++-- .../aggregation/BaseBufferAggregator.java | 96 ++++++++++++ .../query/aggregation/BufferAggregator.java | 76 ++-------- .../query/aggregation/VectorAggregator.java | 29 +--- .../aggregation/AggregationTestHelper.java | 15 ++ 8 files changed, 311 insertions(+), 160 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 2b15cc09cdc7..fa8153af218d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -31,9 +31,13 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -74,7 +78,20 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls()); + return new SketchBufferAggregator.Buffer(selector, size, getMaxIntermediateSizeWithNulls()); + } + + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + final VectorObjectSelector selector = selectorFactory.makeObjectSelector(fieldName); + return new SketchBufferAggregator.Vector(selector, size, getMaxIntermediateSizeWithNulls()); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 2c8688a8c74e..c74c95948719 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -25,25 +25,27 @@ import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.theta.SetOperation; import org.apache.datasketches.theta.Union; +import org.apache.druid.query.aggregation.BaseBufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.IdentityHashMap; -public class SketchBufferAggregator implements BufferAggregator +public abstract class SketchBufferAggregator implements BaseBufferAggregator { - private final BaseObjectColumnValueSelector selector; private final int size; private final int maxIntermediateSize; private final IdentityHashMap> unions = new IdentityHashMap<>(); private final IdentityHashMap memCache = new IdentityHashMap<>(); - public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) + protected SketchBufferAggregator(int size, int maxIntermediateSize) { - this.selector = selector; this.size = size; this.maxIntermediateSize = maxIntermediateSize; } @@ -54,18 +56,6 @@ public void init(ByteBuffer buf, int position) createNewUnion(buf, position, false); } - @Override - public void aggregate(ByteBuffer buf, int position) - { - Object update = selector.getObject(); - if (update == null) { - return; - } - - Union union = getOrCreateUnion(buf, position); - SketchAggregator.updateUnion(union, update); - } - @Override public Object get(ByteBuffer buf, int position) { @@ -82,7 +72,7 @@ public Object get(ByteBuffer buf, int position) return SketchHolder.of(union.getResult(true, null)); } - private Union getOrCreateUnion(ByteBuffer buf, int position) + protected Union getOrCreateUnion(ByteBuffer buf, int position) { Int2ObjectMap unionMap = unions.get(buf); Union union = unionMap != null ? unionMap.get(position) : null; @@ -107,24 +97,6 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) return union; } - @Override - public float getFloat(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public double getDouble(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public void close() { @@ -132,12 +104,6 @@ public void close() memCache.clear(); } - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - @Override public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { @@ -162,4 +128,97 @@ private WritableMemory getMemory(ByteBuffer buffer) return mem; } + public static class Buffer extends SketchBufferAggregator implements BufferAggregator + { + private final BaseObjectColumnValueSelector selector; + + public Buffer(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) + { + super(size, maxIntermediateSize); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Object update = selector.getObject(); + if (update == null) { + return; + } + + Union union = getOrCreateUnion(buf, position); + SketchAggregator.updateUnion(union, update); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } + } + + public static class Vector extends SketchBufferAggregator implements VectorAggregator + { + private final VectorObjectSelector selector; + + public Vector(VectorObjectSelector selector, int size, int maxIntermediateSize) + { + super(size, maxIntermediateSize); + this.selector = selector; + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Union union = getOrCreateUnion(buf, position); + final Object[] vector = selector.getObjectVector(); + + for (int i = startRow; i < endRow; i++) { + final Object o = vector[i]; + if (o != null) { + SketchAggregator.updateUnion(union, o); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final Object o = vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + final Union union = getOrCreateUnion(buf, position); + SketchAggregator.updateUnion(union, o); + } + } + } + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 6a1ae8137470..b348a67d0e53 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.datasketches.theta; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -71,26 +73,30 @@ public class SketchAggregationTest { private final AggregationTestHelper helper; + private final QueryContexts.Vectorize vectorize; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - public SketchAggregationTest(final GroupByQueryConfig config) + public SketchAggregationTest(final GroupByQueryConfig config, final String vectorize) { SketchModule.registerSerde(); - helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( new SketchModule().getJacksonModules(), config, tempFolder ); + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -104,9 +110,8 @@ public void teardown() throws IOException @Test public void testSketchDataIngestAndGpByQuery() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("sketch_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("sketch_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()), @@ -115,7 +120,7 @@ public void testSketchDataIngestAndGpByQuery() throws Exception 0, Granularities.NONE, 1000, - groupByQueryString + groupByQuery ); final String expectedSummary = "\n### HeapCompactOrderedSketch SUMMARY: \n" @@ -164,9 +169,8 @@ public void testSketchDataIngestAndGpByQuery() throws Exception @Test public void testEmptySketchAggregateCombine() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("empty_sketch_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("empty_sketch_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("empty_sketch_data.tsv").getFile()), @@ -175,7 +179,7 @@ public void testEmptySketchAggregateCombine() throws Exception 0, Granularities.NONE, 5, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -199,9 +203,8 @@ public void testEmptySketchAggregateCombine() throws Exception @Test public void testThetaCardinalityOnSimpleColumn() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("simple_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("simple_test_data.tsv").getFile()), @@ -215,7 +218,7 @@ public void testThetaCardinalityOnSimpleColumn() throws Exception 0, Granularities.NONE, 1000, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -426,9 +429,8 @@ public void testCacheKey() @Test public void testRetentionDataIngestAndGpByQuery() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("retention_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("retention_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("retention_test_data.tsv").getFile()), @@ -437,7 +439,7 @@ public void testRetentionDataIngestAndGpByQuery() throws Exception 0, Granularities.NONE, 5, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -538,6 +540,19 @@ private void assertPostAggregatorSerde(PostAggregator agg) throws Exception ); } + public static > Q readQueryFromClasspath( + final String fileName, + final ObjectMapper objectMapper, + final QueryContexts.Vectorize vectorize + ) throws IOException + { + final String queryString = readFileFromClasspathAsString(fileName); + + //noinspection unchecked + return (Q) objectMapper.readValue(queryString, Query.class) + .withOverriddenContext(ImmutableMap.of("vectorize", vectorize.toString())); + } + public static String readFileFromClasspathAsString(String fileName) throws IOException { return Files.asCharSource( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java index a528a313e1af..ffd166b867fe 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.groupby.GroupByQuery; @@ -63,22 +64,26 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling public final TemporaryFolder tempFolder = new TemporaryFolder(); private final GroupByQueryConfig config; + private final QueryContexts.Vectorize vectorize; private SketchModule sm; private File s1; private File s2; - public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config) + public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config, String vectorize) { this.config = config; + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -130,14 +135,15 @@ public void testSimpleDataIngestAndGpByQuery() throws Exception tempFolder ) ) { - final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) gpByQueryAggregationTestHelper - .getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = SketchAggregationTest.readQueryFromClasspath( + "simple_test_data_group_by_query.json", + gpByQueryAggregationTestHelper.getObjectMapper(), + vectorize + ); Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - groupByQueryString + groupByQuery ); List results = seq.map(row -> row.toMapBasedRow(groupByQuery)).toList(); @@ -225,7 +231,11 @@ public void testSimpleDataIngestAndTimeseriesQuery() throws Exception Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("timeseries_query.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "timeseries_query.json", + timeseriesQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); @@ -251,7 +261,11 @@ public void testSimpleDataIngestAndTopNQuery() throws Exception Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("topn_query.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "topn_query.json", + topNQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); @@ -278,7 +292,11 @@ public void testTopNQueryWithSketchConstant() throws Exception Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("topn_query_sketch_const.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "topn_query_sketch_const.json", + topNQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java new file mode 100644 index 000000000000..59470893dcbf --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Contains methods common to {@link BufferAggregator} and {@link VectorAggregator}. + */ +public interface BaseBufferAggregator +{ + /** + * Initializes the buffer location + * + * Implementations of this method must initialize the byte buffer at the given position + * + * Implementations must not change the position, limit or mark of the given buffer + * + * This method must not exceed the number of bytes returned by {@link AggregatorFactory#getMaxIntermediateSizeWithNulls} + * in the corresponding {@link AggregatorFactory} + * + * @param buf byte buffer to initialize + * @param position offset within the byte buffer for initialization + */ + void init(ByteBuffer buf, int position); + + /** + * Returns the intermediate object representation of the given aggregate. + * + * Converts the given byte buffer representation into an intermediate aggregate Object + * + * Implementations must not change the position, limit or mark of the given buffer. + * + * + * The object returned must not have any references to the given buffer (i.e., make a copy), since the + * underlying buffer is a shared resource and may be given to another processing thread + * while the objects returned by this aggregator are still in use. + * + * + * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator + * expects its inputs to be mutable, then the object returned by this method must be mutable. + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the aggregate value is stored + * + * @return the Object representation of the aggregate + */ + @Nullable + Object get(ByteBuffer buf, int position); + + /** + * Relocates any cached objects. + * If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any) + * built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)} + * calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to + * a different location. + * + * Only used if there is any positional caches/objects in the BufferAggregator implementation. + * + * If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called + * multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer. + * + * Implementations must not change the position, limit or mark of the given buffer + * + * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. + * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. + * @param oldBuffer old aggregation buffer. + * @param newBuffer new aggregation buffer. + */ + default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + } + + /** + * Release any resources used by the aggregator + */ + void close(); +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index b047f0ee03c5..1790012e1e64 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -38,21 +38,14 @@ * @see VectorAggregator, the vectorized version */ @ExtensionPoint -public interface BufferAggregator extends HotLoopCallee +public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee { /** - * Initializes the buffer location - * - * Implementations of this method must initialize the byte buffer at the given position - * - * Implementations must not change the position, limit or mark of the given buffer - * - * This method must not exceed the number of bytes returned by {@link AggregatorFactory#getMaxIntermediateSizeWithNulls} - * in the corresponding {@link AggregatorFactory} + * {@inheritDoc} * - * @param buf byte buffer to initialize - * @param position offset within the byte buffer for initialization + * Overridden because this method is a {@link HotLoopCallee} and the superinterface method is not. */ + @Override @CalledFromHotLoop void init(ByteBuffer buf, int position); @@ -64,33 +57,20 @@ public interface BufferAggregator extends HotLoopCallee * * Implementations must not change the position, limit or mark of the given buffer * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the current aggregate value is stored */ @CalledFromHotLoop void aggregate(ByteBuffer buf, int position); /** - * Returns the intermediate object representation of the given aggregate. - * - * Converts the given byte buffer representation into an intermediate aggregate Object - * - * Implementations must not change the position, limit or mark of the given buffer. - * - * - * The object returned must not have any references to the given buffer (i.e., make a copy), since the - * underlying buffer is a shared resource and may be given to another processing thread - * while the objects returned by this aggregator are still in use. - * - * - * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator - * expects its inputs to be mutable, then the object returned by this method must be mutable. + * {@inheritDoc} * - * @param buf byte buffer storing the byte array representation of the aggregate - * @param position offset within the byte buffer at which the aggregate value is stored - * @return the Object representation of the aggregate + * Overridden because this method is a {@link HotLoopCallee} and the superinterface method is not. */ + @Override @Nullable + @CalledFromHotLoop Object get(ByteBuffer buf, int position); /** @@ -104,8 +84,9 @@ public interface BufferAggregator extends HotLoopCallee * have an {@link AggregatorFactory#getType()} ()} of {@link org.apache.druid.segment.column.ValueType#FLOAT}. * If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored + * * @return the float representation of the aggregate */ float getFloat(ByteBuffer buf, int position); @@ -121,8 +102,9 @@ public interface BufferAggregator extends HotLoopCallee * have an {@link AggregatorFactory#getType()} of of {@link org.apache.druid.segment.column.ValueType#LONG}. * If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored + * * @return the long representation of the aggregate */ long getLong(ByteBuffer buf, int position); @@ -142,8 +124,9 @@ public interface BufferAggregator extends HotLoopCallee * This default method is added to enable smooth backward compatibility, please re-implement it if your aggregators * work with numeric double columns. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored + * * @return the double representation of the aggregate */ default double getDouble(ByteBuffer buf, int position) @@ -151,11 +134,6 @@ default double getDouble(ByteBuffer buf, int position) return (double) getFloat(buf, position); } - /** - * Release any resources used by the aggregator - */ - void close(); - /** * {@inheritDoc} * @@ -168,29 +146,6 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) // nothing to inspect } - /** - * Relocates any cached objects. - * If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any) - * built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)} - * calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to - * a different location. - * - * Only used if there is any positional caches/objects in the BufferAggregator implementation. - * - * If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called - * multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer. - * - * Implementations must not change the position, limit or mark of the given buffer - * - * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. - * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. - * @param oldBuffer old aggregation buffer. - * @param newBuffer new aggregation buffer. - */ - default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - } - /** * returns true if aggregator's output type is primitive long/double/float and aggregated value is null, * but when aggregated output type is Object, this method always returns false, @@ -208,5 +163,4 @@ default boolean isNull(ByteBuffer buf, int position) { return false; } - } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java index 8d0983eeba63..3509c3e19a54 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java @@ -27,19 +27,14 @@ * methods (namely, "aggregate" and "get") do not take the actual input values to aggregate, because it is assumed that * the VectorAggregator was given something that it can use to get at the current batch of data. * - * None of the methods in this class are annotated with + * Unlike {@link BufferAggregator}, none of the methods in this class are annotated with * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop} because vectorized query engines do not use * monomorphic-processing-style specialization. * - * @see BufferAggregator, the vectorized version. + * @see BufferAggregator, the nonvectorized version. */ -public interface VectorAggregator +public interface VectorAggregator extends BaseBufferAggregator { - /** - * Same as {@link BufferAggregator#init}. - */ - void init(ByteBuffer buf, int position); - /** * Aggregate a range of rows into a single aggregation slot. * @@ -65,22 +60,4 @@ public interface VectorAggregator * @param positionOffset an offset to apply to each value from "positions" */ void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset); - - /** - * Same as {@link BufferAggregator#get}. - */ - @Nullable - Object get(ByteBuffer buf, int position); - - /** - * Same as {@link BufferAggregator#relocate}. - */ - default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - } - - /** - * Release any resources used by the aggregator. - */ - void close(); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 4a7df063a821..0d6e8b879ef2 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -326,6 +326,21 @@ public Sequence createIndexAndRunQueryOnSegment( return runQueryOnSegments(Collections.singletonList(segmentDir), queryJson); } + public Sequence createIndexAndRunQueryOnSegment( + File inputDataFile, + String parserJson, + String aggregators, + long minTimestamp, + Granularity gran, + int maxRowCount, + Query query + ) throws Exception + { + File segmentDir = tempFolder.newFolder(); + createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount, true); + return runQueryOnSegments(Collections.singletonList(segmentDir), query); + } + public Sequence createIndexAndRunQueryOnSegment( File inputDataFile, String parserJson, From 86429a0ff99394833837a66a7c95d7d1630dea54 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 15 Jan 2021 11:21:52 -0800 Subject: [PATCH 2/7] Rework implementation to use composition instead of inheritance. --- .../theta/SketchAggregatorFactory.java | 4 +- .../theta/SketchBufferAggregator.java | 192 ++++-------------- .../theta/SketchBufferAggregatorHelper.java | 140 +++++++++++++ .../theta/SketchVectorAggregator.java | 98 +++++++++ .../aggregation/BaseBufferAggregator.java | 96 --------- .../query/aggregation/BufferAggregator.java | 76 +++++-- .../query/aggregation/VectorAggregator.java | 29 ++- 7 files changed, 362 insertions(+), 273 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java delete mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index fa8153af218d..12d1e8782501 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -78,14 +78,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new SketchBufferAggregator.Buffer(selector, size, getMaxIntermediateSizeWithNulls()); + return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls()); } @Override public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { final VectorObjectSelector selector = selectorFactory.makeObjectSelector(fieldName); - return new SketchBufferAggregator.Vector(selector, size, getMaxIntermediateSizeWithNulls()); + return new SketchVectorAggregator(selector, size, getMaxIntermediateSizeWithNulls()); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index c74c95948719..34aae3f36e18 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -19,206 +19,84 @@ package org.apache.druid.query.aggregation.datasketches.theta; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.Family; -import org.apache.datasketches.memory.WritableMemory; -import org.apache.datasketches.theta.SetOperation; import org.apache.datasketches.theta.Union; -import org.apache.druid.query.aggregation.BaseBufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseObjectColumnValueSelector; -import org.apache.druid.segment.vector.VectorObjectSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; -public abstract class SketchBufferAggregator implements BaseBufferAggregator +public class SketchBufferAggregator implements BufferAggregator { - private final int size; - private final int maxIntermediateSize; - private final IdentityHashMap> unions = new IdentityHashMap<>(); - private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final BaseObjectColumnValueSelector selector; + private final SketchBufferAggregatorHelper helper; - protected SketchBufferAggregator(int size, int maxIntermediateSize) + public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) { - this.size = size; - this.maxIntermediateSize = maxIntermediateSize; + this.selector = selector; + this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); } @Override public void init(ByteBuffer buf, int position) { - createNewUnion(buf, position, false); + helper.init(buf, position); } @Override - public Object get(ByteBuffer buf, int position) + public void aggregate(ByteBuffer buf, int position) { - Int2ObjectMap unionMap = unions.get(buf); - Union union = unionMap != null ? unionMap.get(position) : null; - if (union == null) { - return SketchHolder.EMPTY; + Object update = selector.getObject(); + if (update == null) { + return; } - //in the code below, I am returning SetOp.getResult(true, null) - //"true" returns an ordered sketch but slower to compute than unordered sketch. - //however, advantage of ordered sketch is that they are faster to "union" later - //given that results from the aggregator will be combined further, it is better - //to return the ordered sketch here - return SketchHolder.of(union.getResult(true, null)); + + Union union = helper.getOrCreateUnion(buf, position); + SketchAggregator.updateUnion(union, update); } - protected Union getOrCreateUnion(ByteBuffer buf, int position) + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) { - Int2ObjectMap unionMap = unions.get(buf); - Union union = unionMap != null ? unionMap.get(position) : null; - if (union != null) { - return union; - } - return createNewUnion(buf, position, true); + return helper.get(buf, position); } - private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) + @Override + public float getFloat(ByteBuffer buf, int position) { - WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize); - Union union = isWrapped - ? (Union) SetOperation.wrap(mem) - : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem); - Int2ObjectMap unionMap = unions.get(buf); - if (unionMap == null) { - unionMap = new Int2ObjectOpenHashMap<>(); - unions.put(buf, unionMap); - } - unionMap.put(position, union); - return union; + throw new UnsupportedOperationException("Not implemented"); } @Override - public void close() + public long getLong(ByteBuffer buf, int position) { - unions.clear(); - memCache.clear(); + throw new UnsupportedOperationException("Not implemented"); } @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + public double getDouble(ByteBuffer buf, int position) { - createNewUnion(newBuffer, newPosition, true); - Int2ObjectMap unionMap = unions.get(oldBuffer); - if (unionMap != null) { - unionMap.remove(oldPosition); - if (unionMap.isEmpty()) { - unions.remove(oldBuffer); - memCache.remove(oldBuffer); - } - } + throw new UnsupportedOperationException("Not implemented"); } - private WritableMemory getMemory(ByteBuffer buffer) + @Override + public void close() { - WritableMemory mem = memCache.get(buffer); - if (mem == null) { - mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN); - memCache.put(buffer, mem); - } - return mem; + helper.close(); } - public static class Buffer extends SketchBufferAggregator implements BufferAggregator + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - private final BaseObjectColumnValueSelector selector; - - public Buffer(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) - { - super(size, maxIntermediateSize); - this.selector = selector; - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - Object update = selector.getObject(); - if (update == null) { - return; - } - - Union union = getOrCreateUnion(buf, position); - SketchAggregator.updateUnion(union, update); - } - - @Override - public float getFloat(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public double getDouble(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } + inspector.visit("selector", selector); } - public static class Vector extends SketchBufferAggregator implements VectorAggregator + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { - private final VectorObjectSelector selector; - - public Vector(VectorObjectSelector selector, int size, int maxIntermediateSize) - { - super(size, maxIntermediateSize); - this.selector = selector; - } - - @Override - public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) - { - final Union union = getOrCreateUnion(buf, position); - final Object[] vector = selector.getObjectVector(); - - for (int i = startRow; i < endRow; i++) { - final Object o = vector[i]; - if (o != null) { - SketchAggregator.updateUnion(union, o); - } - } - } - - @Override - public void aggregate( - final ByteBuffer buf, - final int numRows, - final int[] positions, - @Nullable final int[] rows, - final int positionOffset - ) - { - final Object[] vector = selector.getObjectVector(); - - for (int i = 0; i < numRows; i++) { - final Object o = vector[rows != null ? rows[i] : i]; - - if (o != null) { - final int position = positions[i] + positionOffset; - final Union union = getOrCreateUnion(buf, position); - SketchAggregator.updateUnion(union, o); - } - } - } + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java new file mode 100644 index 000000000000..c009c00e5c00 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.theta; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.Family; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Union; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; + +/** + * A helper class used by {@link SketchBufferAggregator} and {@link SketchVectorAggregator} + * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class. + */ +final class SketchBufferAggregatorHelper +{ + private final int size; + private final int maxIntermediateSize; + private final IdentityHashMap> unions = new IdentityHashMap<>(); + private final IdentityHashMap memCache = new IdentityHashMap<>(); + + public SketchBufferAggregatorHelper(final int size, final int maxIntermediateSize) + { + this.size = size; + this.maxIntermediateSize = maxIntermediateSize; + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(ByteBuffer buf, int position) + { + createNewUnion(buf, position, false); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + */ + public Object get(ByteBuffer buf, int position) + { + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union == null) { + return SketchHolder.EMPTY; + } + //in the code below, I am returning SetOp.getResult(true, null) + //"true" returns an ordered sketch but slower to compute than unordered sketch. + //however, advantage of ordered sketch is that they are faster to "union" later + //given that results from the aggregator will be combined further, it is better + //to return the ordered sketch here + return SketchHolder.of(union.getResult(true, null)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}. + */ + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + createNewUnion(newBuffer, newPosition, true); + Int2ObjectMap unionMap = unions.get(oldBuffer); + if (unionMap != null) { + unionMap.remove(oldPosition); + if (unionMap.isEmpty()) { + unions.remove(oldBuffer); + memCache.remove(oldBuffer); + } + } + } + + /** + * Returns a {@link Union} associated with a particular buffer location. + * + * The Union object will be cached in this helper until {@link #close()} is called. + */ + public Union getOrCreateUnion(ByteBuffer buf, int position) + { + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union != null) { + return union; + } + return createNewUnion(buf, position, true); + } + + private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) + { + WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize); + Union union = isWrapped + ? (Union) SetOperation.wrap(mem) + : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem); + Int2ObjectMap unionMap = unions.get(buf); + if (unionMap == null) { + unionMap = new Int2ObjectOpenHashMap<>(); + unions.put(buf, unionMap); + } + unionMap.put(position, union); + return union; + } + + public void close() + { + unions.clear(); + memCache.clear(); + } + + private WritableMemory getMemory(ByteBuffer buffer) + { + WritableMemory mem = memCache.get(buffer); + if (mem == null) { + mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN); + memCache.put(buffer, mem); + } + return mem; + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java new file mode 100644 index 000000000000..1a42be9c084d --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.theta; + +import org.apache.datasketches.theta.Union; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SketchVectorAggregator implements VectorAggregator +{ + private final VectorObjectSelector selector; + private final SketchBufferAggregatorHelper helper; + + public SketchVectorAggregator(VectorObjectSelector selector, int size, int maxIntermediateSize) + { + this.selector = selector; + this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Union union = helper.getOrCreateUnion(buf, position); + final Object[] vector = selector.getObjectVector(); + + for (int i = startRow; i < endRow; i++) { + final Object o = vector[i]; + if (o != null) { + SketchAggregator.updateUnion(union, o); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final Object o = vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + final Union union = helper.getOrCreateUnion(buf, position); + SketchAggregator.updateUnion(union, o); + } + } + } + @Override + public Object get(ByteBuffer buf, int position) + { + return helper.get(buf, position); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } + + @Override + public void close() + { + helper.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java deleted file mode 100644 index 59470893dcbf..000000000000 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BaseBufferAggregator.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation; - -import javax.annotation.Nullable; -import java.nio.ByteBuffer; - -/** - * Contains methods common to {@link BufferAggregator} and {@link VectorAggregator}. - */ -public interface BaseBufferAggregator -{ - /** - * Initializes the buffer location - * - * Implementations of this method must initialize the byte buffer at the given position - * - * Implementations must not change the position, limit or mark of the given buffer - * - * This method must not exceed the number of bytes returned by {@link AggregatorFactory#getMaxIntermediateSizeWithNulls} - * in the corresponding {@link AggregatorFactory} - * - * @param buf byte buffer to initialize - * @param position offset within the byte buffer for initialization - */ - void init(ByteBuffer buf, int position); - - /** - * Returns the intermediate object representation of the given aggregate. - * - * Converts the given byte buffer representation into an intermediate aggregate Object - * - * Implementations must not change the position, limit or mark of the given buffer. - * - * - * The object returned must not have any references to the given buffer (i.e., make a copy), since the - * underlying buffer is a shared resource and may be given to another processing thread - * while the objects returned by this aggregator are still in use. - * - * - * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator - * expects its inputs to be mutable, then the object returned by this method must be mutable. - * - * @param buf byte buffer storing the byte array representation of the aggregate - * @param position offset within the byte buffer at which the aggregate value is stored - * - * @return the Object representation of the aggregate - */ - @Nullable - Object get(ByteBuffer buf, int position); - - /** - * Relocates any cached objects. - * If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any) - * built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)} - * calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to - * a different location. - * - * Only used if there is any positional caches/objects in the BufferAggregator implementation. - * - * If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called - * multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer. - * - * Implementations must not change the position, limit or mark of the given buffer - * - * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. - * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. - * @param oldBuffer old aggregation buffer. - * @param newBuffer new aggregation buffer. - */ - default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - } - - /** - * Release any resources used by the aggregator - */ - void close(); -} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index 1790012e1e64..b047f0ee03c5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -38,14 +38,21 @@ * @see VectorAggregator, the vectorized version */ @ExtensionPoint -public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee +public interface BufferAggregator extends HotLoopCallee { /** - * {@inheritDoc} + * Initializes the buffer location + * + * Implementations of this method must initialize the byte buffer at the given position + * + * Implementations must not change the position, limit or mark of the given buffer + * + * This method must not exceed the number of bytes returned by {@link AggregatorFactory#getMaxIntermediateSizeWithNulls} + * in the corresponding {@link AggregatorFactory} * - * Overridden because this method is a {@link HotLoopCallee} and the superinterface method is not. + * @param buf byte buffer to initialize + * @param position offset within the byte buffer for initialization */ - @Override @CalledFromHotLoop void init(ByteBuffer buf, int position); @@ -57,20 +64,33 @@ public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee * * Implementations must not change the position, limit or mark of the given buffer * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the current aggregate value is stored */ @CalledFromHotLoop void aggregate(ByteBuffer buf, int position); /** - * {@inheritDoc} + * Returns the intermediate object representation of the given aggregate. + * + * Converts the given byte buffer representation into an intermediate aggregate Object * - * Overridden because this method is a {@link HotLoopCallee} and the superinterface method is not. + * Implementations must not change the position, limit or mark of the given buffer. + * + * + * The object returned must not have any references to the given buffer (i.e., make a copy), since the + * underlying buffer is a shared resource and may be given to another processing thread + * while the objects returned by this aggregator are still in use. + * + * + * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator + * expects its inputs to be mutable, then the object returned by this method must be mutable. + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the aggregate value is stored + * @return the Object representation of the aggregate */ - @Override @Nullable - @CalledFromHotLoop Object get(ByteBuffer buf, int position); /** @@ -84,9 +104,8 @@ public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee * have an {@link AggregatorFactory#getType()} ()} of {@link org.apache.druid.segment.column.ValueType#FLOAT}. * If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored - * * @return the float representation of the aggregate */ float getFloat(ByteBuffer buf, int position); @@ -102,9 +121,8 @@ public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee * have an {@link AggregatorFactory#getType()} of of {@link org.apache.druid.segment.column.ValueType#LONG}. * If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored - * * @return the long representation of the aggregate */ long getLong(ByteBuffer buf, int position); @@ -124,9 +142,8 @@ public interface BufferAggregator extends BaseBufferAggregator, HotLoopCallee * This default method is added to enable smooth backward compatibility, please re-implement it if your aggregators * work with numeric double columns. * - * @param buf byte buffer storing the byte array representation of the aggregate + * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored - * * @return the double representation of the aggregate */ default double getDouble(ByteBuffer buf, int position) @@ -134,6 +151,11 @@ default double getDouble(ByteBuffer buf, int position) return (double) getFloat(buf, position); } + /** + * Release any resources used by the aggregator + */ + void close(); + /** * {@inheritDoc} * @@ -146,6 +168,29 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) // nothing to inspect } + /** + * Relocates any cached objects. + * If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any) + * built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)} + * calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to + * a different location. + * + * Only used if there is any positional caches/objects in the BufferAggregator implementation. + * + * If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called + * multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer. + * + * Implementations must not change the position, limit or mark of the given buffer + * + * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. + * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. + * @param oldBuffer old aggregation buffer. + * @param newBuffer new aggregation buffer. + */ + default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + } + /** * returns true if aggregator's output type is primitive long/double/float and aggregated value is null, * but when aggregated output type is Object, this method always returns false, @@ -163,4 +208,5 @@ default boolean isNull(ByteBuffer buf, int position) { return false; } + } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java index 3509c3e19a54..8d0983eeba63 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java @@ -27,14 +27,19 @@ * methods (namely, "aggregate" and "get") do not take the actual input values to aggregate, because it is assumed that * the VectorAggregator was given something that it can use to get at the current batch of data. * - * Unlike {@link BufferAggregator}, none of the methods in this class are annotated with + * None of the methods in this class are annotated with * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop} because vectorized query engines do not use * monomorphic-processing-style specialization. * - * @see BufferAggregator, the nonvectorized version. + * @see BufferAggregator, the vectorized version. */ -public interface VectorAggregator extends BaseBufferAggregator +public interface VectorAggregator { + /** + * Same as {@link BufferAggregator#init}. + */ + void init(ByteBuffer buf, int position); + /** * Aggregate a range of rows into a single aggregation slot. * @@ -60,4 +65,22 @@ public interface VectorAggregator extends BaseBufferAggregator * @param positionOffset an offset to apply to each value from "positions" */ void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset); + + /** + * Same as {@link BufferAggregator#get}. + */ + @Nullable + Object get(ByteBuffer buf, int position); + + /** + * Same as {@link BufferAggregator#relocate}. + */ + default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + } + + /** + * Release any resources used by the aggregator. + */ + void close(); } From f298cb7c86ad0102aff38a28e6c051dd59625b21 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Jan 2021 23:22:57 -0800 Subject: [PATCH 3/7] Rework things to enable working properly for both complex types and regular types. Involved finally moving makeVectorProcessor from DimensionHandlerUtils into ColumnProcessors and harmonizing the two things. --- .../theta/SketchAggregatorFactory.java | 4 +- .../theta/SketchVectorAggregator.java | 26 ++- .../ToObjectVectorColumnProcessorFactory.java | 140 ++++++++++++ ...bjectVectorColumnProcessorFactoryTest.java | 210 ++++++++++++++++++ .../druid/query/filter/InDimFilter.java | 3 +- .../filter/vector/NilVectorValueMatcher.java | 50 +++++ ...torValueMatcherColumnProcessorFactory.java | 10 + .../GroupByVectorColumnProcessorFactory.java | 10 + .../vector/GroupByVectorColumnSelector.java | 2 +- .../NilGroupByVectorColumnSelector.java | 59 +++++ .../vector/VectorGroupByEngine.java | 4 +- .../druid/segment/ColumnProcessors.java | 197 ++++++++++++++-- .../druid/segment/DimensionHandlerUtils.java | 115 ++-------- .../druid/segment/DimensionSelector.java | 15 +- .../segment/VectorColumnProcessorFactory.java | 35 ++- .../druid/segment/filter/BoundFilter.java | 4 +- .../filter/DimensionPredicateFilter.java | 5 +- .../druid/segment/filter/LikeFilter.java | 4 +- .../druid/segment/filter/SelectorFilter.java | 4 +- .../vector/VectorColumnSelectorFactory.java | 35 ++- .../query/groupby/GroupByQueryRunnerTest.java | 30 +++ .../VectorGroupByEngineIteratorTest.java | 4 +- 22 files changed, 806 insertions(+), 160 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java create mode 100644 extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java create mode 100644 processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 12d1e8782501..7a10a169bbbe 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -37,7 +37,6 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; -import org.apache.druid.segment.vector.VectorObjectSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -84,8 +83,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) @Override public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { - final VectorObjectSelector selector = selectorFactory.makeObjectSelector(fieldName); - return new SketchVectorAggregator(selector, size, getMaxIntermediateSizeWithNulls()); + return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls()); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index 1a42be9c084d..b5b9ad842cf1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -21,20 +21,33 @@ import org.apache.datasketches.theta.Union; import org.apache.druid.query.aggregation.VectorAggregator; -import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.function.Supplier; public class SketchVectorAggregator implements VectorAggregator { - private final VectorObjectSelector selector; + private final Supplier toObjectProcessor; private final SketchBufferAggregatorHelper helper; - public SketchVectorAggregator(VectorObjectSelector selector, int size, int maxIntermediateSize) + public SketchVectorAggregator( + VectorColumnSelectorFactory columnSelectorFactory, + String column, + int size, + int maxIntermediateSize + ) { - this.selector = selector; this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); + this.toObjectProcessor = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); } @Override @@ -47,7 +60,7 @@ public void init(final ByteBuffer buf, final int position) public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) { final Union union = helper.getOrCreateUnion(buf, position); - final Object[] vector = selector.getObjectVector(); + final Object[] vector = toObjectProcessor.get(); for (int i = startRow; i < endRow; i++) { final Object o = vector[i]; @@ -66,7 +79,7 @@ public void aggregate( final int positionOffset ) { - final Object[] vector = selector.getObjectVector(); + final Object[] vector = toObjectProcessor.get(); for (int i = 0; i < numRows; i++) { final Object o = vector[rows != null ? rows[i] : i]; @@ -78,6 +91,7 @@ public void aggregate( } } } + @Override public Object get(ByteBuffer buf, int position) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java new file mode 100644 index 000000000000..bf04a1c2a7e1 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.util; + +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.util.function.Supplier; + +/** + * Builds vector processors that return Object arrays. Not a terribly efficient way to write aggregators, since this + * is fighting against the strongly-typed design of the vector processing system. However, it simplifies the aggregator + * code quite a bit, and most of the sketches that use this don't have special handling for primitive types anyway, so + * we hopefully shouldn't lose much performance. + */ +public class ToObjectVectorColumnProcessorFactory implements VectorColumnProcessorFactory> +{ + public static final ToObjectVectorColumnProcessorFactory INSTANCE = new ToObjectVectorColumnProcessorFactory(); + + private ToObjectVectorColumnProcessorFactory() + { + } + + @Override + public Supplier makeSingleValueDimensionProcessor( + ColumnCapabilities capabilities, SingleValueDimensionVectorSelector selector + ) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final int[] values = selector.getRowVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = selector.lookupName(values[i]); + } + + return retVal; + }; + } + + @Override + public Supplier makeMultiValueDimensionProcessor( + ColumnCapabilities capabilities, + MultiValueDimensionVectorSelector selector + ) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final IndexedInts[] values = selector.getRowVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = DimensionSelector.rowToObject(values[i], selector); + } + + return retVal; + }; + } + + @Override + public Supplier makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final float[] values = selector.getFloatVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final double[] values = selector.getDoubleVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final long[] values = selector.getLongVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeComplexProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) + { + return selector::getObjectVector; + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java new file mode 100644 index 000000000000..e147cbc0377f --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.util; + +import com.google.common.collect.Iterables; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +public class ToObjectVectorColumnProcessorFactoryTest extends InitializedNullHandlingTest +{ + private StorageAdapter adapter; + + @Before + public void setUp() + { + final QueryableIndex index = TestIndex.getMMappedTestIndex(); + adapter = new QueryableIndexStorageAdapter(index); + } + + @Test + public void testRead() + { + try (final VectorCursor cursor = makeCursor()) { + final Supplier qualitySupplier = ColumnProcessors.makeVectorProcessor( + "quality", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityLongSupplier = ColumnProcessors.makeVectorProcessor( + "qualityLong", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityFloatSupplier = ColumnProcessors.makeVectorProcessor( + "qualityFloat", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityDoubleSupplier = ColumnProcessors.makeVectorProcessor( + "qualityDouble", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier placementSupplier = ColumnProcessors.makeVectorProcessor( + "placement", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityUniquesSupplier = ColumnProcessors.makeVectorProcessor( + "quality_uniques", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + } + } + + @Test + public void testString() + { + Assert.assertEquals( + Arrays.asList( + "automotive", + "business", + "entertainment", + "health", + "mezzanine", + "news", + "premium", + "technology", + "travel", + "mezzanine" + ), + readColumn("quality", 10) + ); + } + + @Test + public void testLong() + { + Assert.assertEquals( + Arrays.asList(1000L, 1100L, 1200L, 1300L, 1400L, 1500L, 1600L, 1700L, 1800L, 1400L), + readColumn("qualityLong", 10) + ); + } + + @Test + public void testFloat() + { + Assert.assertEquals( + Arrays.asList(10000f, 11000f, 12000f, 13000f, 14000f, 15000f, 16000f, 17000f, 18000f, 14000f), + readColumn("qualityFloat", 10) + ); + } + + @Test + public void testDouble() + { + Assert.assertEquals( + Arrays.asList(10000.0, 11000.0, 12000.0, 13000.0, 14000.0, 15000.0, 16000.0, 17000.0, 18000.0, 14000.0), + readColumn("qualityDouble", 10) + ); + } + + @Test + public void testMultiString() + { + Assert.assertEquals( + Arrays.asList( + Arrays.asList("a", "preferred"), + Arrays.asList("b", "preferred"), + Arrays.asList("e", "preferred"), + Arrays.asList("h", "preferred"), + Arrays.asList("m", "preferred"), + Arrays.asList("n", "preferred"), + Arrays.asList("p", "preferred"), + Arrays.asList("preferred", "t"), + Arrays.asList("preferred", "t"), + Arrays.asList("m", "preferred") + ), + readColumn("placementish", 10) + ); + } + + @Test + public void testComplexSketch() + { + final Object sketch = Iterables.getOnlyElement(readColumn("quality_uniques", 1)); + Assert.assertThat(sketch, CoreMatchers.instanceOf(HyperLogLogCollector.class)); + } + + private VectorCursor makeCursor() + { + return adapter.makeVectorCursor( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + false, + 3, /* vector size */ + null + ); + } + + private List readColumn(final String column, final int limit) + { + try (final VectorCursor cursor = makeCursor()) { + final Supplier supplier = ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final List retVal = new ArrayList<>(); + + while (!cursor.isDone()) { + final Object[] objects = supplier.get(); + + for (int i = 0; i < cursor.getCurrentVectorSize(); i++) { + retVal.add(objects[i]); + + if (retVal.size() >= limit) { + return retVal; + } + } + + cursor.advance(); + } + + return retVal; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java index 063d6d3b428d..80e2c03d8d33 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java +++ b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java @@ -55,6 +55,7 @@ import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; import org.apache.druid.query.lookup.LookupExtractionFn; import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.DimensionHandlerUtils; @@ -312,7 +313,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java new file mode 100644 index 000000000000..8817cc6c5a91 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.filter.vector; + +import org.apache.druid.query.filter.DruidPredicateFactory; +import org.apache.druid.segment.vector.ReadableVectorInspector; + +import javax.annotation.Nullable; + +/** + * Treats all rows as null. + */ +public class NilVectorValueMatcher implements VectorValueMatcherFactory +{ + private final ReadableVectorInspector vectorInspector; + + public NilVectorValueMatcher(final ReadableVectorInspector vectorInspector) + { + this.vectorInspector = vectorInspector; + } + + @Override + public VectorValueMatcher makeMatcher(@Nullable String value) + { + return BooleanVectorValueMatcher.of(vectorInspector, value == null); + } + + @Override + public VectorValueMatcher makeMatcher(DruidPredicateFactory predicateFactory) + { + return BooleanVectorValueMatcher.of(vectorInspector, predicateFactory.makeStringPredicate().apply(null)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java index b2083cc42a82..87de123781ad 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java @@ -23,6 +23,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory @@ -83,4 +84,13 @@ public VectorValueMatcherFactory makeLongProcessor( { return new LongVectorValueMatcher(selector); } + + @Override + public VectorValueMatcherFactory makeComplexProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + return null; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java index 46dec35495f0..06c90438d086 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; public class GroupByVectorColumnProcessorFactory implements VectorColumnProcessorFactory @@ -102,4 +103,13 @@ public GroupByVectorColumnSelector makeLongProcessor( } return new NullableLongGroupByVectorColumnSelector(selector); } + + @Override + public GroupByVectorColumnSelector makeComplexProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + return NilGroupByVectorColumnSelector.INSTANCE; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java index 028c820de8ba..707980f10111 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java @@ -53,7 +53,7 @@ public interface GroupByVectorColumnSelector * Write key parts for this column into a particular result row. * * @param keyMemory key memory - * @param keyOffset starting positionĀ for this key part within keyMemory + * @param keyOffset starting position for this key part within keyMemory * @param resultRow result row to receive key parts * @param resultRowPosition position within the result row for this key part */ diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java new file mode 100644 index 000000000000..cc512877e76b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae.vector; + +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.ResultRow; + +/** + * Treats all rows as null. + */ +public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelector +{ + public static final NilGroupByVectorColumnSelector INSTANCE = new NilGroupByVectorColumnSelector(); + + private NilGroupByVectorColumnSelector() + { + // Singleton. + } + + @Override + public int getGroupingKeySize() + { + return 0; + } + + @Override + public void writeKeys( + WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow + ) + { + // Nothing to do. + } + + @Override + public void writeKeyToResultRow( + Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition + ) + { + resultRow.set(resultRowPosition, null); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index f516ea5b63ba..374ba664f8f1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -42,7 +42,7 @@ import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.vector.VectorCursorGranularizer; -import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; @@ -175,7 +175,7 @@ public void close() final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); final List dimensions = query.getDimensions().stream().map( dimensionSpec -> - DimensionHandlerUtils.makeVectorProcessor( + ColumnProcessors.makeVectorProcessor( dimensionSpec, GroupByVectorColumnProcessorFactory.instance(), columnSelectorFactory diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java index 05e85fdd4c24..c38f0763149a 100644 --- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java +++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java @@ -29,6 +29,12 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NilVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.virtual.ExpressionSelectors; import javax.annotation.Nullable; @@ -38,11 +44,20 @@ * top of it. * * @see DimensionHandlerUtils#createColumnSelectorPlus which this may eventually replace - * @see DimensionHandlerUtils#makeVectorProcessor which creates similar, vectorized processors; may eventually be moved - * into this class. */ public class ColumnProcessors { + /** + * Capabilites that are used when we return a nil selector for a nonexistent column. + */ + public static final ColumnCapabilities NIL_COLUMN_CAPABILITIES = + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(true) + .setDictionaryValuesSorted(true) + .setHasBitmapIndexes(false) + .setHasMultipleValues(false); + /** * Make a processor for a particular named column. * @@ -81,25 +96,10 @@ public static T makeProcessor( ) { return makeProcessorInternal( - factory -> { - // Capabilities of the column that the dimensionSpec is reading. We can't return these straight-up, because - // the _result_ of the dimensionSpec might have different capabilities. But what we return will generally be - // based on them. - final ColumnCapabilities dimensionCapabilities = factory.getColumnCapabilities(dimensionSpec.getDimension()); - - if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) { - // DimensionSpec is doing some sort of transformation. The result is always a string. - - return new ColumnCapabilitiesImpl() - .setType(ValueType.STRING) - .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering()) - .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() == ExtractionFn.ExtractionType.ONE_TO_ONE) - .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(dimensionCapabilities)); - } else { - // No transformation. Pass through. - return dimensionCapabilities; - } - }, + factory -> computeDimensionSpecCapabilities( + dimensionSpec, + factory.getColumnCapabilities(dimensionSpec.getDimension()) + ), factory -> factory.makeDimensionSelector(dimensionSpec), factory -> factory.makeColumnValueSelector(dimensionSpec.getDimension()), processorFactory, @@ -144,6 +144,94 @@ public static T makeProcessor( } } + /** + * Make a processor for a particular named column. + * + * @param column the column + * @param processorFactory the processor factory + * @param selectorFactory the column selector factory + * @param processor type + */ + public static T makeVectorProcessor( + final String column, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + return makeVectorProcessorInternal( + factory -> factory.getColumnCapabilities(column), + factory -> factory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(column)), + factory -> factory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(column)), + factory -> factory.makeValueSelector(column), + factory -> factory.makeObjectSelector(column), + processorFactory, + selectorFactory + ); + } + + /** + * Make a processor for a particular {@link DimensionSpec}. + * + * @param dimensionSpec the dimension spec + * @param processorFactory the processor factory + * @param selectorFactory the column selector factory + * @param processor type + */ + public static T makeVectorProcessor( + final DimensionSpec dimensionSpec, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + return makeVectorProcessorInternal( + factory -> computeDimensionSpecCapabilities( + dimensionSpec, + factory.getColumnCapabilities(dimensionSpec.getDimension()) + ), + factory -> factory.makeSingleValueDimensionSelector(dimensionSpec), + factory -> factory.makeMultiValueDimensionSelector(dimensionSpec), + factory -> factory.makeValueSelector(dimensionSpec.getDimension()), + factory -> factory.makeObjectSelector(dimensionSpec.getDimension()), + processorFactory, + selectorFactory + ); + } + + /** + * Returns the capabilities of selectors derived from a particular {@link DimensionSpec}. + * + * Will only return non-STRING types if the DimensionSpec passes through inputs unchanged. (i.e., it's a + * {@link DefaultDimensionSpec}, or something that behaves like one.) + * + * @param dimensionSpec The dimensionSpec. + * @param columnCapabilities Capabilities of the column that the dimensionSpec is reading, i.e. + * {@link DimensionSpec#getDimension()}. + */ + @Nullable + private static ColumnCapabilities computeDimensionSpecCapabilities( + final DimensionSpec dimensionSpec, + @Nullable final ColumnCapabilities columnCapabilities + ) + { + if (dimensionSpec.mustDecorate()) { + // Decorating DimensionSpecs could do anything. We can't pass along any useful info other than the type. + return new ColumnCapabilitiesImpl().setType(ValueType.STRING); + } else if (dimensionSpec.getExtractionFn() != null) { + // DimensionSpec is applying an extractionFn but *not* decorating. We have some insight into how the + // extractionFn will behave, so let's use it. + + return new ColumnCapabilitiesImpl() + .setType(ValueType.STRING) + .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering()) + .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() + == ExtractionFn.ExtractionType.ONE_TO_ONE) + .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(columnCapabilities)); + } else { + // No transformation. Pass through underlying types. + return columnCapabilities; + } + } + /** * Creates "column processors", which are objects that wrap a single input column and provide some * functionality on top of it. @@ -158,8 +246,6 @@ public static T makeProcessor( * called if the column type is long, float, double, or complex. * @param processorFactory object that encapsulates the knowledge about how to create processors * @param selectorFactory column selector factory used for creating the vector processor - * - * @see DimensionHandlerUtils#makeVectorProcessor the vectorized version */ private static T makeProcessorInternal( final Function inputCapabilitiesFn, @@ -191,6 +277,71 @@ private static T makeProcessorInternal( } } + /** + * Creates "column processors", which are objects that wrap a single input column and provide some + * functionality on top of it. + * + * @param inputCapabilitiesFn function that returns capabilities of the column being processed. The type provided + * by these capabilities will be used to determine what kind of selector to create. If + * this function returns null, then it is assumed that the column does not exist. + * Note: this is different behavior from the non-vectorized version. + * @param singleValueDimensionSelectorFn function that creates a singly-valued dimension selector for the column being + * processed. Will be called if the column is singly-valued string. + * @param multiValueDimensionSelectorFn function that creates a multi-valued dimension selector for the column being + * processed. Will be called if the column is multi-valued string. + * @param valueSelectorFn function that creates a value selector for the column being processed. Will be + * called if the column type is long, float, or double. + * @param objectSelectorFn function that creates an object selector for the column being processed. Will + * be called if the column type is complex. + * @param processorFactory object that encapsulates the knowledge about how to create processors + * @param selectorFactory column selector factory used for creating the vector processor + */ + private static T makeVectorProcessorInternal( + final Function inputCapabilitiesFn, + final Function singleValueDimensionSelectorFn, + final Function multiValueDimensionSelectorFn, + final Function valueSelectorFn, + final Function objectSelectorFn, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + final ColumnCapabilities capabilities = inputCapabilitiesFn.apply(selectorFactory); + + if (capabilities == null) { + // Column does not exist. + return processorFactory.makeSingleValueDimensionProcessor( + NIL_COLUMN_CAPABILITIES, + NilVectorSelector.create(selectorFactory.getReadableVectorInspector()) + ); + } + + switch (capabilities.getType()) { + case STRING: + if (mayBeMultiValue(capabilities)) { + return processorFactory.makeMultiValueDimensionProcessor( + capabilities, + multiValueDimensionSelectorFn.apply(selectorFactory) + ); + } else { + return processorFactory.makeSingleValueDimensionProcessor( + capabilities, + singleValueDimensionSelectorFn.apply(selectorFactory) + ); + } + case LONG: + return processorFactory.makeLongProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case FLOAT: + return processorFactory.makeFloatProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case DOUBLE: + return processorFactory.makeDoubleProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case COMPLEX: + return processorFactory.makeComplexProcessor(capabilities, objectSelectorFn.apply(selectorFactory)); + default: + throw new ISE("Unsupported type[%s]", capabilities.getType()); + } + } + /** * Returns true if a given set of capabilities might indicate an underlying multi-value column. Errs on the side * of returning true if unknown; i.e. if this returns false, there are _definitely not_ mul. diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java index 1032530fa4d3..c2d748e5c980 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java @@ -32,13 +32,11 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.dimension.ColumnSelectorStrategy; import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory; -import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.math.BigDecimal; @@ -105,7 +103,12 @@ private DimensionHandlerUtils() if (!capabilities.isDictionaryEncoded().isTrue()) { throw new IAE("String column must have dictionary encoding."); } - return new StringDimensionHandler(dimensionName, multiValueHandling, capabilities.hasBitmapIndexes(), capabilities.hasSpatialIndexes()); + return new StringDimensionHandler( + dimensionName, + multiValueHandling, + capabilities.hasBitmapIndexes(), + capabilities.hasSpatialIndexes() + ); } if (capabilities.getType() == ValueType.LONG) { @@ -146,10 +149,10 @@ public static List getValueTypesFromDimensionSpecs(List The strategy type created by the provided strategy factory. - * @param strategyFactory A factory provided by query engines that generates type-handling strategies - * @param dimensionSpec column to generate a ColumnSelectorPlus object for - * @param cursor Used to create value selectors for columns. + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpec column to generate a ColumnSelectorPlus object for + * @param cursor Used to create value selectors for columns. * * @return A ColumnSelectorPlus object * @@ -175,10 +178,10 @@ public static ColumnSelectorPlus The strategy type created by the provided strategy factory. - * @param strategyFactory A factory provided by query engines that generates type-handling strategies - * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for - * @param columnSelectorFactory Used to create value selectors for columns. + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for + * @param columnSelectorFactory Used to create value selectors for columns. * * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs * @@ -287,96 +290,6 @@ private static Strategy makeStrategy( return strategyFactory.makeColumnSelectorStrategy(capabilities, selector); } - /** - * Equivalent to calling makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory). - * - * @see #makeVectorProcessor(DimensionSpec, VectorColumnProcessorFactory, VectorColumnSelectorFactory) - * @see ColumnProcessors#makeProcessor the non-vectorized version - */ - public static T makeVectorProcessor( - final String column, - final VectorColumnProcessorFactory strategyFactory, - final VectorColumnSelectorFactory selectorFactory - ) - { - return makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory); - } - - /** - * Creates "vector processors", which are objects that wrap a single vectorized input column and provide some - * functionality on top of it. Used by things like query engines and filter matchers. - * - * Supports the basic types STRING, LONG, DOUBLE, and FLOAT. - * - * @param dimensionSpec dimensionSpec for the input to the processor - * @param strategyFactory object that encapsulates the knowledge about how to create processors - * @param selectorFactory column selector factory used for creating the vector processor - * - * @see ColumnProcessors#makeProcessor the non-vectorized version - */ - public static T makeVectorProcessor( - final DimensionSpec dimensionSpec, - final VectorColumnProcessorFactory strategyFactory, - final VectorColumnSelectorFactory selectorFactory - ) - { - final ColumnCapabilities originalCapabilities = - selectorFactory.getColumnCapabilities(dimensionSpec.getDimension()); - - final ColumnCapabilities effectiveCapabilites = getEffectiveCapabilities( - dimensionSpec, - originalCapabilities - ); - - final ValueType type = effectiveCapabilites.getType(); - - // vector selectors should never have null column capabilities, these signify a non-existent column, and complex - // columns should never be treated as a multi-value column, so always use single value string processor - final boolean forceSingleValue = - originalCapabilities == null || ValueType.COMPLEX.equals(originalCapabilities.getType()); - - if (type == ValueType.STRING) { - if (!forceSingleValue && effectiveCapabilites.hasMultipleValues().isMaybeTrue()) { - return strategyFactory.makeMultiValueDimensionProcessor( - effectiveCapabilites, - selectorFactory.makeMultiValueDimensionSelector(dimensionSpec) - ); - } else { - return strategyFactory.makeSingleValueDimensionProcessor( - effectiveCapabilites, - selectorFactory.makeSingleValueDimensionSelector(dimensionSpec) - ); - } - } else { - Preconditions.checkState( - dimensionSpec.getExtractionFn() == null && !dimensionSpec.mustDecorate(), - "Uh oh, was about to try to make a value selector for type[%s] with a dimensionSpec of class[%s] that " - + "requires decoration. Possible bug.", - type, - dimensionSpec.getClass().getName() - ); - - if (type == ValueType.LONG) { - return strategyFactory.makeLongProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else if (type == ValueType.FLOAT) { - return strategyFactory.makeFloatProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else if (type == ValueType.DOUBLE) { - return strategyFactory.makeDoubleProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else { - throw new ISE("Unsupported type[%s]", effectiveCapabilites.getType()); - } - } - } - @Nullable public static String convertObjectToString(@Nullable Object valObj) { diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java index f50ae1e70b94..554210a45dc4 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java @@ -122,16 +122,25 @@ default boolean isNull() @Nullable default Object defaultGetObject() { - IndexedInts row = getRow(); + return rowToObject(getRow(), this); + } + + /** + * Converts a particular {@link IndexedInts} to an Object in a standard way, assuming each element in the IndexedInts + * is a dictionary ID that can be resolved with the provided selector. + */ + @Nullable + static Object rowToObject(IndexedInts row, DimensionDictionarySelector selector) + { int rowSize = row.size(); if (rowSize == 0) { return null; } else if (rowSize == 1) { - return lookupName(row.get(0)); + return selector.lookupName(row.get(0)); } else { final String[] strings = new String[rowSize]; for (int i = 0; i < rowSize; i++) { - strings[i] = lookupName(row.get(i)); + strings[i] = selector.lookupName(row.get(i)); } return Arrays.asList(strings); } diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java index 6ae1557ea61a..33ea4f86c908 100644 --- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java @@ -22,34 +22,59 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; /** * Class that encapsulates knowledge about how to create vector column processors. Used by - * {@link DimensionHandlerUtils#makeVectorProcessor}. + * {@link ColumnProcessors#makeVectorProcessor}. * - * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is - * always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns - * are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align - * with {@link ColumnProcessorFactory}. + * Column processors can be any type "T". The idea is that a ColumnProcessorFactory embodies the logic for wrapping + * and processing selectors of various types, and so enables nice code design, where type-dependent code is not + * sprinkled throughout. + * + * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method, because vector + * column types are always known, so it isn't necessary. * * @see ColumnProcessorFactory the non-vectorized version */ public interface VectorColumnProcessorFactory { + /** + * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value + * per row. + */ T makeSingleValueDimensionProcessor( ColumnCapabilities capabilities, SingleValueDimensionVectorSelector selector ); + /** + * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values + * per row. + */ T makeMultiValueDimensionProcessor( ColumnCapabilities capabilities, MultiValueDimensionVectorSelector selector ); + /** + * Called when {@link ColumnCapabilities#getType()} is FLOAT. + */ T makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + /** + * Called when {@link ColumnCapabilities#getType()} is DOUBLE. + */ T makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + /** + * Called when {@link ColumnCapabilities#getType()} is LONG. + */ T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + + /** + * Called when {@link ColumnCapabilities#getType()} is COMPLEX. + */ + T makeComplexProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java index 58225e300b04..f097e4389f4f 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java @@ -41,9 +41,9 @@ import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IntListUtils; import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; @@ -129,7 +129,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( boundDimFilter.getDimension(), VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java index 8c6d2b0564f0..bba9d552346f 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java @@ -36,15 +36,16 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.Objects; import java.util.Set; /** + * */ public class DimensionPredicateFilter implements Filter { @@ -98,7 +99,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java index 72332c50b7cc..473ef9527ac9 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java @@ -35,9 +35,9 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.data.CloseableIndexed; import org.apache.druid.segment.data.Indexed; @@ -91,7 +91,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java index fbe80341c2f1..54dcef8306cb 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java @@ -29,9 +29,9 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -87,7 +87,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java index e99d47988920..f33ed7b08384 100644 --- a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java @@ -26,9 +26,13 @@ import javax.annotation.Nullable; /** + * A class that comes from {@link VectorCursor#getColumnSelectorFactory()} and is used to create vector selectors. * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this class. * - * @see org.apache.druid.segment.ColumnSelectorFactory, the non-vectorized version. + * @see org.apache.druid.segment.ColumnSelectorFactory the non-vectorized version. */ public interface VectorColumnSelectorFactory extends ColumnInspector { @@ -48,22 +52,43 @@ default int getMaxVectorSize() } /** - * Returns a string-typed, single-value-per-row column selector. + * Returns a string-typed, single-value-per-row column selector. Should only be called on columns where + * {@link #getColumnCapabilities} indicates they return STRING, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec); /** - * Returns a string-typed, multi-value-per-row column selector. + * Returns a string-typed, multi-value-per-row column selector. Should only be called on columns where + * {@link #getColumnCapabilities} indicates they return STRING. Unlike {@link #makeSingleValueDimensionSelector}, + * this should not be called on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec); /** - * Returns a primitive column selector. + * Returns a primitive column selector. Should only be called on columns where {@link #getColumnCapabilities} + * indicates they return DOUBLE, FLOAT, or LONG, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorValueSelector makeValueSelector(String column); /** - * Returns an object selector, useful for complex columns. + * Returns an object selector. Should only be called on columns where {@link #getColumnCapabilities} indicates that + * they return STRING or COMPLEX, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorObjectSelector makeObjectSelector(String column); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index df1251b44b4d..1c640d706422 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -8807,6 +8807,36 @@ public void testGroupByLongColumn() TestHelper.assertExpectedObjects(expectedResults, results, "long"); } + @Test + public void testGroupByComplexColumn() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality_uniques", "quality_uniques")) + .setDimFilter(new SelectorDimFilter("quality_uniques", null, null)) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + Assert.assertEquals(Functions.>identity(), query.getLimitSpec().build(query)); + + List expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "quality_uniques", + null, + "rows", + 26L, + "idx", + 12446L + ) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "long"); + } + @Test public void testGroupByLongColumnDescending() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java index 627c2bdc886a..c4a3e903eaf7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java @@ -29,7 +29,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator; -import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; @@ -72,7 +72,7 @@ public void testCreateOneGrouperAndCloseItWhenClose() throws IOException ); final List dimensions = query.getDimensions().stream().map( dimensionSpec -> - DimensionHandlerUtils.makeVectorProcessor( + ColumnProcessors.makeVectorProcessor( dimensionSpec, GroupByVectorColumnProcessorFactory.instance(), cursor.getColumnSelectorFactory() From 582c71ce80962fa2e35febae30513cdb68585bcf Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Jan 2021 23:29:18 -0800 Subject: [PATCH 4/7] Add missing method. --- .../druid/query/filter/vector/NilVectorValueMatcher.java | 6 +++--- .../vector/VectorValueMatcherColumnProcessorFactory.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java index 8817cc6c5a91..badf4b9af0e2 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java @@ -20,7 +20,7 @@ package org.apache.druid.query.filter.vector; import org.apache.druid.query.filter.DruidPredicateFactory; -import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.VectorSizeInspector; import javax.annotation.Nullable; @@ -29,9 +29,9 @@ */ public class NilVectorValueMatcher implements VectorValueMatcherFactory { - private final ReadableVectorInspector vectorInspector; + private final VectorSizeInspector vectorInspector; - public NilVectorValueMatcher(final ReadableVectorInspector vectorInspector) + public NilVectorValueMatcher(final VectorSizeInspector vectorInspector) { this.vectorInspector = vectorInspector; } diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java index 87de123781ad..5f938fecbd8a 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java @@ -91,6 +91,6 @@ public VectorValueMatcherFactory makeComplexProcessor( final VectorObjectSelector selector ) { - return null; + return new NilVectorValueMatcher(selector); } } From fee14afbd53310f39be8f1d1436070d146b075a7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 20 Jan 2021 09:42:39 -0800 Subject: [PATCH 5/7] Style and name changes. --- .../util/ToObjectVectorColumnProcessorFactory.java | 2 +- .../vector/VectorValueMatcherColumnProcessorFactory.java | 2 +- .../vector/GroupByVectorColumnProcessorFactory.java | 2 +- .../vector/NilGroupByVectorColumnSelector.java | 8 ++------ .../java/org/apache/druid/segment/ColumnProcessors.java | 2 +- .../druid/segment/VectorColumnProcessorFactory.java | 2 +- 6 files changed, 7 insertions(+), 11 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java index bf04a1c2a7e1..84b556a14987 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java @@ -133,7 +133,7 @@ public Supplier makeLongProcessor(ColumnCapabilities capabilities, Vec } @Override - public Supplier makeComplexProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) + public Supplier makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) { return selector::getObjectVector; } diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java index 5f938fecbd8a..4d30c2d6de2e 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java @@ -86,7 +86,7 @@ public VectorValueMatcherFactory makeLongProcessor( } @Override - public VectorValueMatcherFactory makeComplexProcessor( + public VectorValueMatcherFactory makeObjectProcessor( final ColumnCapabilities capabilities, final VectorObjectSelector selector ) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java index 06c90438d086..53748dfcc5df 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java @@ -105,7 +105,7 @@ public GroupByVectorColumnSelector makeLongProcessor( } @Override - public GroupByVectorColumnSelector makeComplexProcessor( + public GroupByVectorColumnSelector makeObjectProcessor( final ColumnCapabilities capabilities, final VectorObjectSelector selector ) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java index cc512877e76b..e70eaa796aa1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java @@ -42,17 +42,13 @@ public int getGroupingKeySize() } @Override - public void writeKeys( - WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow - ) + public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow) { // Nothing to do. } @Override - public void writeKeyToResultRow( - Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition - ) + public void writeKeyToResultRow(Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition) { resultRow.set(resultRowPosition, null); } diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java index c38f0763149a..e2857a1a630d 100644 --- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java +++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java @@ -336,7 +336,7 @@ private static T makeVectorProcessorInternal( case DOUBLE: return processorFactory.makeDoubleProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); case COMPLEX: - return processorFactory.makeComplexProcessor(capabilities, objectSelectorFn.apply(selectorFactory)); + return processorFactory.makeObjectProcessor(capabilities, objectSelectorFn.apply(selectorFactory)); default: throw new ISE("Unsupported type[%s]", capabilities.getType()); } diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java index 33ea4f86c908..1774a3d805cc 100644 --- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java @@ -76,5 +76,5 @@ T makeMultiValueDimensionProcessor( /** * Called when {@link ColumnCapabilities#getType()} is COMPLEX. */ - T makeComplexProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector); + T makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector); } From ff3ef2fa321e3820d1cc5eb107dc298361bc5157 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 26 Jan 2021 11:00:32 -0800 Subject: [PATCH 6/7] Fix issues from inspections. --- extensions-core/datasketches/pom.xml | 11 +++++++++++ .../druid/segment/VectorColumnProcessorFactory.java | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index 603fcf92f4f5..8e83ecc6ca88 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -166,6 +166,11 @@ easymock test + + org.hamcrest + hamcrest-core + test + nl.jqno.equalsverifier equalsverifier @@ -178,6 +183,12 @@ test-jar test + + org.apache.druid + druid-hll + ${project.parent.version} + test + org.apache.druid druid-processing diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java index 1774a3d805cc..7ebdb81a99c8 100644 --- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java @@ -76,5 +76,5 @@ T makeMultiValueDimensionProcessor( /** * Called when {@link ColumnCapabilities#getType()} is COMPLEX. */ - T makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector); + T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector); } From fd93498a13b32d06cf1a57a96b3638a159addff7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 28 Jan 2021 10:01:26 -0800 Subject: [PATCH 7/7] Fix style issue. --- .../util/ToObjectVectorColumnProcessorFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java index 84b556a14987..2915f34a7930 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java @@ -46,7 +46,8 @@ private ToObjectVectorColumnProcessorFactory() @Override public Supplier makeSingleValueDimensionProcessor( - ColumnCapabilities capabilities, SingleValueDimensionVectorSelector selector + ColumnCapabilities capabilities, + SingleValueDimensionVectorSelector selector ) { final Object[] retVal = new Object[selector.getMaxVectorSize()];