diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index 603fcf92f4f5..8e83ecc6ca88 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -166,6 +166,11 @@ easymock test + + org.hamcrest + hamcrest-core + test + nl.jqno.equalsverifier equalsverifier @@ -178,6 +183,12 @@ test-jar test + + org.apache.druid + druid-hll + ${project.parent.version} + test + org.apache.druid druid-processing diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 2b15cc09cdc7..7a10a169bbbe 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -31,9 +31,12 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -77,6 +80,18 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls()); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls()); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + @Override public Object deserialize(Object object) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 2c8688a8c74e..34aae3f36e18 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -19,39 +19,29 @@ package org.apache.druid.query.aggregation.datasketches.theta; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.Family; -import org.apache.datasketches.memory.WritableMemory; -import org.apache.datasketches.theta.SetOperation; import org.apache.datasketches.theta.Union; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; public class SketchBufferAggregator implements BufferAggregator { private final BaseObjectColumnValueSelector selector; - private final int size; - private final int maxIntermediateSize; - private final IdentityHashMap> unions = new IdentityHashMap<>(); - private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final SketchBufferAggregatorHelper helper; public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) { this.selector = selector; - this.size = size; - this.maxIntermediateSize = maxIntermediateSize; + this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); } @Override public void init(ByteBuffer buf, int position) { - createNewUnion(buf, position, false); + helper.init(buf, position); } @Override @@ -62,49 +52,16 @@ public void aggregate(ByteBuffer buf, int position) return; } - Union union = getOrCreateUnion(buf, position); + Union union = helper.getOrCreateUnion(buf, position); SketchAggregator.updateUnion(union, update); } + + @Nullable @Override public Object get(ByteBuffer buf, int position) { - Int2ObjectMap unionMap = unions.get(buf); - Union union = unionMap != null ? unionMap.get(position) : null; - if (union == null) { - return SketchHolder.EMPTY; - } - //in the code below, I am returning SetOp.getResult(true, null) - //"true" returns an ordered sketch but slower to compute than unordered sketch. - //however, advantage of ordered sketch is that they are faster to "union" later - //given that results from the aggregator will be combined further, it is better - //to return the ordered sketch here - return SketchHolder.of(union.getResult(true, null)); - } - - private Union getOrCreateUnion(ByteBuffer buf, int position) - { - Int2ObjectMap unionMap = unions.get(buf); - Union union = unionMap != null ? unionMap.get(position) : null; - if (union != null) { - return union; - } - return createNewUnion(buf, position, true); - } - - private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) - { - WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize); - Union union = isWrapped - ? (Union) SetOperation.wrap(mem) - : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem); - Int2ObjectMap unionMap = unions.get(buf); - if (unionMap == null) { - unionMap = new Int2ObjectOpenHashMap<>(); - unions.put(buf, unionMap); - } - unionMap.put(position, union); - return union; + return helper.get(buf, position); } @Override @@ -128,8 +85,7 @@ public double getDouble(ByteBuffer buf, int position) @Override public void close() { - unions.clear(); - memCache.clear(); + helper.close(); } @Override @@ -141,25 +97,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { - createNewUnion(newBuffer, newPosition, true); - Int2ObjectMap unionMap = unions.get(oldBuffer); - if (unionMap != null) { - unionMap.remove(oldPosition); - if (unionMap.isEmpty()) { - unions.remove(oldBuffer); - memCache.remove(oldBuffer); - } - } - } - - private WritableMemory getMemory(ByteBuffer buffer) - { - WritableMemory mem = memCache.get(buffer); - if (mem == null) { - mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN); - memCache.put(buffer, mem); - } - return mem; + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java new file mode 100644 index 000000000000..c009c00e5c00 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.theta; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.Family; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Union; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; + +/** + * A helper class used by {@link SketchBufferAggregator} and {@link SketchVectorAggregator} + * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class. + */ +final class SketchBufferAggregatorHelper +{ + private final int size; + private final int maxIntermediateSize; + private final IdentityHashMap> unions = new IdentityHashMap<>(); + private final IdentityHashMap memCache = new IdentityHashMap<>(); + + public SketchBufferAggregatorHelper(final int size, final int maxIntermediateSize) + { + this.size = size; + this.maxIntermediateSize = maxIntermediateSize; + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(ByteBuffer buf, int position) + { + createNewUnion(buf, position, false); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + */ + public Object get(ByteBuffer buf, int position) + { + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union == null) { + return SketchHolder.EMPTY; + } + //in the code below, I am returning SetOp.getResult(true, null) + //"true" returns an ordered sketch but slower to compute than unordered sketch. + //however, advantage of ordered sketch is that they are faster to "union" later + //given that results from the aggregator will be combined further, it is better + //to return the ordered sketch here + return SketchHolder.of(union.getResult(true, null)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}. + */ + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + createNewUnion(newBuffer, newPosition, true); + Int2ObjectMap unionMap = unions.get(oldBuffer); + if (unionMap != null) { + unionMap.remove(oldPosition); + if (unionMap.isEmpty()) { + unions.remove(oldBuffer); + memCache.remove(oldBuffer); + } + } + } + + /** + * Returns a {@link Union} associated with a particular buffer location. + * + * The Union object will be cached in this helper until {@link #close()} is called. + */ + public Union getOrCreateUnion(ByteBuffer buf, int position) + { + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union != null) { + return union; + } + return createNewUnion(buf, position, true); + } + + private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) + { + WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize); + Union union = isWrapped + ? (Union) SetOperation.wrap(mem) + : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem); + Int2ObjectMap unionMap = unions.get(buf); + if (unionMap == null) { + unionMap = new Int2ObjectOpenHashMap<>(); + unions.put(buf, unionMap); + } + unionMap.put(position, union); + return union; + } + + public void close() + { + unions.clear(); + memCache.clear(); + } + + private WritableMemory getMemory(ByteBuffer buffer) + { + WritableMemory mem = memCache.get(buffer); + if (mem == null) { + mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN); + memCache.put(buffer, mem); + } + return mem; + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java new file mode 100644 index 000000000000..b5b9ad842cf1 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.theta; + +import org.apache.datasketches.theta.Union; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.function.Supplier; + +public class SketchVectorAggregator implements VectorAggregator +{ + private final Supplier toObjectProcessor; + private final SketchBufferAggregatorHelper helper; + + public SketchVectorAggregator( + VectorColumnSelectorFactory columnSelectorFactory, + String column, + int size, + int maxIntermediateSize + ) + { + this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); + this.toObjectProcessor = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Union union = helper.getOrCreateUnion(buf, position); + final Object[] vector = toObjectProcessor.get(); + + for (int i = startRow; i < endRow; i++) { + final Object o = vector[i]; + if (o != null) { + SketchAggregator.updateUnion(union, o); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = toObjectProcessor.get(); + + for (int i = 0; i < numRows; i++) { + final Object o = vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + final Union union = helper.getOrCreateUnion(buf, position); + SketchAggregator.updateUnion(union, o); + } + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return helper.get(buf, position); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer); + } + + @Override + public void close() + { + helper.close(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java new file mode 100644 index 000000000000..2915f34a7930 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.util; + +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.util.function.Supplier; + +/** + * Builds vector processors that return Object arrays. Not a terribly efficient way to write aggregators, since this + * is fighting against the strongly-typed design of the vector processing system. However, it simplifies the aggregator + * code quite a bit, and most of the sketches that use this don't have special handling for primitive types anyway, so + * we hopefully shouldn't lose much performance. + */ +public class ToObjectVectorColumnProcessorFactory implements VectorColumnProcessorFactory> +{ + public static final ToObjectVectorColumnProcessorFactory INSTANCE = new ToObjectVectorColumnProcessorFactory(); + + private ToObjectVectorColumnProcessorFactory() + { + } + + @Override + public Supplier makeSingleValueDimensionProcessor( + ColumnCapabilities capabilities, + SingleValueDimensionVectorSelector selector + ) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final int[] values = selector.getRowVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = selector.lookupName(values[i]); + } + + return retVal; + }; + } + + @Override + public Supplier makeMultiValueDimensionProcessor( + ColumnCapabilities capabilities, + MultiValueDimensionVectorSelector selector + ) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final IndexedInts[] values = selector.getRowVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = DimensionSelector.rowToObject(values[i], selector); + } + + return retVal; + }; + } + + @Override + public Supplier makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final float[] values = selector.getFloatVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final double[] values = selector.getDoubleVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + final Object[] retVal = new Object[selector.getMaxVectorSize()]; + + return () -> { + final long[] values = selector.getLongVector(); + final boolean[] nulls = selector.getNullVector(); + + for (int i = 0; i < selector.getCurrentVectorSize(); i++) { + retVal[i] = nulls == null || !nulls[i] ? values[i] : null; + } + + return retVal; + }; + } + + @Override + public Supplier makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) + { + return selector::getObjectVector; + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 6a1ae8137470..b348a67d0e53 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.datasketches.theta; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -71,26 +73,30 @@ public class SketchAggregationTest { private final AggregationTestHelper helper; + private final QueryContexts.Vectorize vectorize; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - public SketchAggregationTest(final GroupByQueryConfig config) + public SketchAggregationTest(final GroupByQueryConfig config, final String vectorize) { SketchModule.registerSerde(); - helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( new SketchModule().getJacksonModules(), config, tempFolder ); + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -104,9 +110,8 @@ public void teardown() throws IOException @Test public void testSketchDataIngestAndGpByQuery() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("sketch_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("sketch_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()), @@ -115,7 +120,7 @@ public void testSketchDataIngestAndGpByQuery() throws Exception 0, Granularities.NONE, 1000, - groupByQueryString + groupByQuery ); final String expectedSummary = "\n### HeapCompactOrderedSketch SUMMARY: \n" @@ -164,9 +169,8 @@ public void testSketchDataIngestAndGpByQuery() throws Exception @Test public void testEmptySketchAggregateCombine() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("empty_sketch_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("empty_sketch_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("empty_sketch_data.tsv").getFile()), @@ -175,7 +179,7 @@ public void testEmptySketchAggregateCombine() throws Exception 0, Granularities.NONE, 5, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -199,9 +203,8 @@ public void testEmptySketchAggregateCombine() throws Exception @Test public void testThetaCardinalityOnSimpleColumn() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("simple_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(SketchAggregationTest.class.getClassLoader().getResource("simple_test_data.tsv").getFile()), @@ -215,7 +218,7 @@ public void testThetaCardinalityOnSimpleColumn() throws Exception 0, Granularities.NONE, 1000, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -426,9 +429,8 @@ public void testCacheKey() @Test public void testRetentionDataIngestAndGpByQuery() throws Exception { - final String groupByQueryString = readFileFromClasspathAsString("retention_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = + readQueryFromClasspath("retention_test_data_group_by_query.json", helper.getObjectMapper(), vectorize); final Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("retention_test_data.tsv").getFile()), @@ -437,7 +439,7 @@ public void testRetentionDataIngestAndGpByQuery() throws Exception 0, Granularities.NONE, 5, - groupByQueryString + groupByQuery ); List results = seq.toList(); @@ -538,6 +540,19 @@ private void assertPostAggregatorSerde(PostAggregator agg) throws Exception ); } + public static > Q readQueryFromClasspath( + final String fileName, + final ObjectMapper objectMapper, + final QueryContexts.Vectorize vectorize + ) throws IOException + { + final String queryString = readFileFromClasspathAsString(fileName); + + //noinspection unchecked + return (Q) objectMapper.readValue(queryString, Query.class) + .withOverriddenContext(ImmutableMap.of("vectorize", vectorize.toString())); + } + public static String readFileFromClasspathAsString(String fileName) throws IOException { return Files.asCharSource( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java index a528a313e1af..ffd166b867fe 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.groupby.GroupByQuery; @@ -63,22 +64,26 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling public final TemporaryFolder tempFolder = new TemporaryFolder(); private final GroupByQueryConfig config; + private final QueryContexts.Vectorize vectorize; private SketchModule sm; private File s1; private File s2; - public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config) + public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config, String vectorize) { this.config = config; + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -130,14 +135,15 @@ public void testSimpleDataIngestAndGpByQuery() throws Exception tempFolder ) ) { - final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json"); - final GroupByQuery groupByQuery = (GroupByQuery) gpByQueryAggregationTestHelper - .getObjectMapper() - .readValue(groupByQueryString, Query.class); + final GroupByQuery groupByQuery = SketchAggregationTest.readQueryFromClasspath( + "simple_test_data_group_by_query.json", + gpByQueryAggregationTestHelper.getObjectMapper(), + vectorize + ); Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - groupByQueryString + groupByQuery ); List results = seq.map(row -> row.toMapBasedRow(groupByQuery)).toList(); @@ -225,7 +231,11 @@ public void testSimpleDataIngestAndTimeseriesQuery() throws Exception Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("timeseries_query.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "timeseries_query.json", + timeseriesQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); @@ -251,7 +261,11 @@ public void testSimpleDataIngestAndTopNQuery() throws Exception Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("topn_query.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "topn_query.json", + topNQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); @@ -278,7 +292,11 @@ public void testTopNQueryWithSketchConstant() throws Exception Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments( ImmutableList.of(s1, s2), - readFileFromClasspathAsString("topn_query_sketch_const.json") + (Query) SketchAggregationTest.readQueryFromClasspath( + "topn_query_sketch_const.json", + topNQueryAggregationTestHelper.getObjectMapper(), + vectorize + ) ); Result result = (Result) Iterables.getOnlyElement(seq.toList()); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java new file mode 100644 index 000000000000..e147cbc0377f --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.util; + +import com.google.common.collect.Iterables; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +public class ToObjectVectorColumnProcessorFactoryTest extends InitializedNullHandlingTest +{ + private StorageAdapter adapter; + + @Before + public void setUp() + { + final QueryableIndex index = TestIndex.getMMappedTestIndex(); + adapter = new QueryableIndexStorageAdapter(index); + } + + @Test + public void testRead() + { + try (final VectorCursor cursor = makeCursor()) { + final Supplier qualitySupplier = ColumnProcessors.makeVectorProcessor( + "quality", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityLongSupplier = ColumnProcessors.makeVectorProcessor( + "qualityLong", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityFloatSupplier = ColumnProcessors.makeVectorProcessor( + "qualityFloat", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityDoubleSupplier = ColumnProcessors.makeVectorProcessor( + "qualityDouble", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier placementSupplier = ColumnProcessors.makeVectorProcessor( + "placement", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final Supplier qualityUniquesSupplier = ColumnProcessors.makeVectorProcessor( + "quality_uniques", + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + } + } + + @Test + public void testString() + { + Assert.assertEquals( + Arrays.asList( + "automotive", + "business", + "entertainment", + "health", + "mezzanine", + "news", + "premium", + "technology", + "travel", + "mezzanine" + ), + readColumn("quality", 10) + ); + } + + @Test + public void testLong() + { + Assert.assertEquals( + Arrays.asList(1000L, 1100L, 1200L, 1300L, 1400L, 1500L, 1600L, 1700L, 1800L, 1400L), + readColumn("qualityLong", 10) + ); + } + + @Test + public void testFloat() + { + Assert.assertEquals( + Arrays.asList(10000f, 11000f, 12000f, 13000f, 14000f, 15000f, 16000f, 17000f, 18000f, 14000f), + readColumn("qualityFloat", 10) + ); + } + + @Test + public void testDouble() + { + Assert.assertEquals( + Arrays.asList(10000.0, 11000.0, 12000.0, 13000.0, 14000.0, 15000.0, 16000.0, 17000.0, 18000.0, 14000.0), + readColumn("qualityDouble", 10) + ); + } + + @Test + public void testMultiString() + { + Assert.assertEquals( + Arrays.asList( + Arrays.asList("a", "preferred"), + Arrays.asList("b", "preferred"), + Arrays.asList("e", "preferred"), + Arrays.asList("h", "preferred"), + Arrays.asList("m", "preferred"), + Arrays.asList("n", "preferred"), + Arrays.asList("p", "preferred"), + Arrays.asList("preferred", "t"), + Arrays.asList("preferred", "t"), + Arrays.asList("m", "preferred") + ), + readColumn("placementish", 10) + ); + } + + @Test + public void testComplexSketch() + { + final Object sketch = Iterables.getOnlyElement(readColumn("quality_uniques", 1)); + Assert.assertThat(sketch, CoreMatchers.instanceOf(HyperLogLogCollector.class)); + } + + private VectorCursor makeCursor() + { + return adapter.makeVectorCursor( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + false, + 3, /* vector size */ + null + ); + } + + private List readColumn(final String column, final int limit) + { + try (final VectorCursor cursor = makeCursor()) { + final Supplier supplier = ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + cursor.getColumnSelectorFactory() + ); + + final List retVal = new ArrayList<>(); + + while (!cursor.isDone()) { + final Object[] objects = supplier.get(); + + for (int i = 0; i < cursor.getCurrentVectorSize(); i++) { + retVal.add(objects[i]); + + if (retVal.size() >= limit) { + return retVal; + } + } + + cursor.advance(); + } + + return retVal; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java index 063d6d3b428d..80e2c03d8d33 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java +++ b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java @@ -55,6 +55,7 @@ import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; import org.apache.druid.query.lookup.LookupExtractionFn; import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.DimensionHandlerUtils; @@ -312,7 +313,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java new file mode 100644 index 000000000000..badf4b9af0e2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.filter.vector; + +import org.apache.druid.query.filter.DruidPredicateFactory; +import org.apache.druid.segment.vector.VectorSizeInspector; + +import javax.annotation.Nullable; + +/** + * Treats all rows as null. + */ +public class NilVectorValueMatcher implements VectorValueMatcherFactory +{ + private final VectorSizeInspector vectorInspector; + + public NilVectorValueMatcher(final VectorSizeInspector vectorInspector) + { + this.vectorInspector = vectorInspector; + } + + @Override + public VectorValueMatcher makeMatcher(@Nullable String value) + { + return BooleanVectorValueMatcher.of(vectorInspector, value == null); + } + + @Override + public VectorValueMatcher makeMatcher(DruidPredicateFactory predicateFactory) + { + return BooleanVectorValueMatcher.of(vectorInspector, predicateFactory.makeStringPredicate().apply(null)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java index b2083cc42a82..4d30c2d6de2e 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java @@ -23,6 +23,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory @@ -83,4 +84,13 @@ public VectorValueMatcherFactory makeLongProcessor( { return new LongVectorValueMatcher(selector); } + + @Override + public VectorValueMatcherFactory makeObjectProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + return new NilVectorValueMatcher(selector); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java index 46dec35495f0..53748dfcc5df 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; public class GroupByVectorColumnProcessorFactory implements VectorColumnProcessorFactory @@ -102,4 +103,13 @@ public GroupByVectorColumnSelector makeLongProcessor( } return new NullableLongGroupByVectorColumnSelector(selector); } + + @Override + public GroupByVectorColumnSelector makeObjectProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + return NilGroupByVectorColumnSelector.INSTANCE; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java index 028c820de8ba..707980f10111 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java @@ -53,7 +53,7 @@ public interface GroupByVectorColumnSelector * Write key parts for this column into a particular result row. * * @param keyMemory key memory - * @param keyOffset starting positionĀ for this key part within keyMemory + * @param keyOffset starting position for this key part within keyMemory * @param resultRow result row to receive key parts * @param resultRowPosition position within the result row for this key part */ diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java new file mode 100644 index 000000000000..e70eaa796aa1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae.vector; + +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.ResultRow; + +/** + * Treats all rows as null. + */ +public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelector +{ + public static final NilGroupByVectorColumnSelector INSTANCE = new NilGroupByVectorColumnSelector(); + + private NilGroupByVectorColumnSelector() + { + // Singleton. + } + + @Override + public int getGroupingKeySize() + { + return 0; + } + + @Override + public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow) + { + // Nothing to do. + } + + @Override + public void writeKeyToResultRow(Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition) + { + resultRow.set(resultRowPosition, null); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index f516ea5b63ba..374ba664f8f1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -42,7 +42,7 @@ import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.vector.VectorCursorGranularizer; -import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; @@ -175,7 +175,7 @@ public void close() final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); final List dimensions = query.getDimensions().stream().map( dimensionSpec -> - DimensionHandlerUtils.makeVectorProcessor( + ColumnProcessors.makeVectorProcessor( dimensionSpec, GroupByVectorColumnProcessorFactory.instance(), columnSelectorFactory diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java index 05e85fdd4c24..e2857a1a630d 100644 --- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java +++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java @@ -29,6 +29,12 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NilVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.segment.virtual.ExpressionSelectors; import javax.annotation.Nullable; @@ -38,11 +44,20 @@ * top of it. * * @see DimensionHandlerUtils#createColumnSelectorPlus which this may eventually replace - * @see DimensionHandlerUtils#makeVectorProcessor which creates similar, vectorized processors; may eventually be moved - * into this class. */ public class ColumnProcessors { + /** + * Capabilites that are used when we return a nil selector for a nonexistent column. + */ + public static final ColumnCapabilities NIL_COLUMN_CAPABILITIES = + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(true) + .setDictionaryValuesSorted(true) + .setHasBitmapIndexes(false) + .setHasMultipleValues(false); + /** * Make a processor for a particular named column. * @@ -81,25 +96,10 @@ public static T makeProcessor( ) { return makeProcessorInternal( - factory -> { - // Capabilities of the column that the dimensionSpec is reading. We can't return these straight-up, because - // the _result_ of the dimensionSpec might have different capabilities. But what we return will generally be - // based on them. - final ColumnCapabilities dimensionCapabilities = factory.getColumnCapabilities(dimensionSpec.getDimension()); - - if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) { - // DimensionSpec is doing some sort of transformation. The result is always a string. - - return new ColumnCapabilitiesImpl() - .setType(ValueType.STRING) - .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering()) - .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() == ExtractionFn.ExtractionType.ONE_TO_ONE) - .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(dimensionCapabilities)); - } else { - // No transformation. Pass through. - return dimensionCapabilities; - } - }, + factory -> computeDimensionSpecCapabilities( + dimensionSpec, + factory.getColumnCapabilities(dimensionSpec.getDimension()) + ), factory -> factory.makeDimensionSelector(dimensionSpec), factory -> factory.makeColumnValueSelector(dimensionSpec.getDimension()), processorFactory, @@ -144,6 +144,94 @@ public static T makeProcessor( } } + /** + * Make a processor for a particular named column. + * + * @param column the column + * @param processorFactory the processor factory + * @param selectorFactory the column selector factory + * @param processor type + */ + public static T makeVectorProcessor( + final String column, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + return makeVectorProcessorInternal( + factory -> factory.getColumnCapabilities(column), + factory -> factory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(column)), + factory -> factory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(column)), + factory -> factory.makeValueSelector(column), + factory -> factory.makeObjectSelector(column), + processorFactory, + selectorFactory + ); + } + + /** + * Make a processor for a particular {@link DimensionSpec}. + * + * @param dimensionSpec the dimension spec + * @param processorFactory the processor factory + * @param selectorFactory the column selector factory + * @param processor type + */ + public static T makeVectorProcessor( + final DimensionSpec dimensionSpec, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + return makeVectorProcessorInternal( + factory -> computeDimensionSpecCapabilities( + dimensionSpec, + factory.getColumnCapabilities(dimensionSpec.getDimension()) + ), + factory -> factory.makeSingleValueDimensionSelector(dimensionSpec), + factory -> factory.makeMultiValueDimensionSelector(dimensionSpec), + factory -> factory.makeValueSelector(dimensionSpec.getDimension()), + factory -> factory.makeObjectSelector(dimensionSpec.getDimension()), + processorFactory, + selectorFactory + ); + } + + /** + * Returns the capabilities of selectors derived from a particular {@link DimensionSpec}. + * + * Will only return non-STRING types if the DimensionSpec passes through inputs unchanged. (i.e., it's a + * {@link DefaultDimensionSpec}, or something that behaves like one.) + * + * @param dimensionSpec The dimensionSpec. + * @param columnCapabilities Capabilities of the column that the dimensionSpec is reading, i.e. + * {@link DimensionSpec#getDimension()}. + */ + @Nullable + private static ColumnCapabilities computeDimensionSpecCapabilities( + final DimensionSpec dimensionSpec, + @Nullable final ColumnCapabilities columnCapabilities + ) + { + if (dimensionSpec.mustDecorate()) { + // Decorating DimensionSpecs could do anything. We can't pass along any useful info other than the type. + return new ColumnCapabilitiesImpl().setType(ValueType.STRING); + } else if (dimensionSpec.getExtractionFn() != null) { + // DimensionSpec is applying an extractionFn but *not* decorating. We have some insight into how the + // extractionFn will behave, so let's use it. + + return new ColumnCapabilitiesImpl() + .setType(ValueType.STRING) + .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering()) + .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() + == ExtractionFn.ExtractionType.ONE_TO_ONE) + .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(columnCapabilities)); + } else { + // No transformation. Pass through underlying types. + return columnCapabilities; + } + } + /** * Creates "column processors", which are objects that wrap a single input column and provide some * functionality on top of it. @@ -158,8 +246,6 @@ public static T makeProcessor( * called if the column type is long, float, double, or complex. * @param processorFactory object that encapsulates the knowledge about how to create processors * @param selectorFactory column selector factory used for creating the vector processor - * - * @see DimensionHandlerUtils#makeVectorProcessor the vectorized version */ private static T makeProcessorInternal( final Function inputCapabilitiesFn, @@ -191,6 +277,71 @@ private static T makeProcessorInternal( } } + /** + * Creates "column processors", which are objects that wrap a single input column and provide some + * functionality on top of it. + * + * @param inputCapabilitiesFn function that returns capabilities of the column being processed. The type provided + * by these capabilities will be used to determine what kind of selector to create. If + * this function returns null, then it is assumed that the column does not exist. + * Note: this is different behavior from the non-vectorized version. + * @param singleValueDimensionSelectorFn function that creates a singly-valued dimension selector for the column being + * processed. Will be called if the column is singly-valued string. + * @param multiValueDimensionSelectorFn function that creates a multi-valued dimension selector for the column being + * processed. Will be called if the column is multi-valued string. + * @param valueSelectorFn function that creates a value selector for the column being processed. Will be + * called if the column type is long, float, or double. + * @param objectSelectorFn function that creates an object selector for the column being processed. Will + * be called if the column type is complex. + * @param processorFactory object that encapsulates the knowledge about how to create processors + * @param selectorFactory column selector factory used for creating the vector processor + */ + private static T makeVectorProcessorInternal( + final Function inputCapabilitiesFn, + final Function singleValueDimensionSelectorFn, + final Function multiValueDimensionSelectorFn, + final Function valueSelectorFn, + final Function objectSelectorFn, + final VectorColumnProcessorFactory processorFactory, + final VectorColumnSelectorFactory selectorFactory + ) + { + final ColumnCapabilities capabilities = inputCapabilitiesFn.apply(selectorFactory); + + if (capabilities == null) { + // Column does not exist. + return processorFactory.makeSingleValueDimensionProcessor( + NIL_COLUMN_CAPABILITIES, + NilVectorSelector.create(selectorFactory.getReadableVectorInspector()) + ); + } + + switch (capabilities.getType()) { + case STRING: + if (mayBeMultiValue(capabilities)) { + return processorFactory.makeMultiValueDimensionProcessor( + capabilities, + multiValueDimensionSelectorFn.apply(selectorFactory) + ); + } else { + return processorFactory.makeSingleValueDimensionProcessor( + capabilities, + singleValueDimensionSelectorFn.apply(selectorFactory) + ); + } + case LONG: + return processorFactory.makeLongProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case FLOAT: + return processorFactory.makeFloatProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case DOUBLE: + return processorFactory.makeDoubleProcessor(capabilities, valueSelectorFn.apply(selectorFactory)); + case COMPLEX: + return processorFactory.makeObjectProcessor(capabilities, objectSelectorFn.apply(selectorFactory)); + default: + throw new ISE("Unsupported type[%s]", capabilities.getType()); + } + } + /** * Returns true if a given set of capabilities might indicate an underlying multi-value column. Errs on the side * of returning true if unknown; i.e. if this returns false, there are _definitely not_ mul. diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java index 1032530fa4d3..c2d748e5c980 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java @@ -32,13 +32,11 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.dimension.ColumnSelectorStrategy; import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory; -import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.math.BigDecimal; @@ -105,7 +103,12 @@ private DimensionHandlerUtils() if (!capabilities.isDictionaryEncoded().isTrue()) { throw new IAE("String column must have dictionary encoding."); } - return new StringDimensionHandler(dimensionName, multiValueHandling, capabilities.hasBitmapIndexes(), capabilities.hasSpatialIndexes()); + return new StringDimensionHandler( + dimensionName, + multiValueHandling, + capabilities.hasBitmapIndexes(), + capabilities.hasSpatialIndexes() + ); } if (capabilities.getType() == ValueType.LONG) { @@ -146,10 +149,10 @@ public static List getValueTypesFromDimensionSpecs(List The strategy type created by the provided strategy factory. - * @param strategyFactory A factory provided by query engines that generates type-handling strategies - * @param dimensionSpec column to generate a ColumnSelectorPlus object for - * @param cursor Used to create value selectors for columns. + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpec column to generate a ColumnSelectorPlus object for + * @param cursor Used to create value selectors for columns. * * @return A ColumnSelectorPlus object * @@ -175,10 +178,10 @@ public static ColumnSelectorPlus The strategy type created by the provided strategy factory. - * @param strategyFactory A factory provided by query engines that generates type-handling strategies - * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for - * @param columnSelectorFactory Used to create value selectors for columns. + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for + * @param columnSelectorFactory Used to create value selectors for columns. * * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs * @@ -287,96 +290,6 @@ private static Strategy makeStrategy( return strategyFactory.makeColumnSelectorStrategy(capabilities, selector); } - /** - * Equivalent to calling makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory). - * - * @see #makeVectorProcessor(DimensionSpec, VectorColumnProcessorFactory, VectorColumnSelectorFactory) - * @see ColumnProcessors#makeProcessor the non-vectorized version - */ - public static T makeVectorProcessor( - final String column, - final VectorColumnProcessorFactory strategyFactory, - final VectorColumnSelectorFactory selectorFactory - ) - { - return makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory); - } - - /** - * Creates "vector processors", which are objects that wrap a single vectorized input column and provide some - * functionality on top of it. Used by things like query engines and filter matchers. - * - * Supports the basic types STRING, LONG, DOUBLE, and FLOAT. - * - * @param dimensionSpec dimensionSpec for the input to the processor - * @param strategyFactory object that encapsulates the knowledge about how to create processors - * @param selectorFactory column selector factory used for creating the vector processor - * - * @see ColumnProcessors#makeProcessor the non-vectorized version - */ - public static T makeVectorProcessor( - final DimensionSpec dimensionSpec, - final VectorColumnProcessorFactory strategyFactory, - final VectorColumnSelectorFactory selectorFactory - ) - { - final ColumnCapabilities originalCapabilities = - selectorFactory.getColumnCapabilities(dimensionSpec.getDimension()); - - final ColumnCapabilities effectiveCapabilites = getEffectiveCapabilities( - dimensionSpec, - originalCapabilities - ); - - final ValueType type = effectiveCapabilites.getType(); - - // vector selectors should never have null column capabilities, these signify a non-existent column, and complex - // columns should never be treated as a multi-value column, so always use single value string processor - final boolean forceSingleValue = - originalCapabilities == null || ValueType.COMPLEX.equals(originalCapabilities.getType()); - - if (type == ValueType.STRING) { - if (!forceSingleValue && effectiveCapabilites.hasMultipleValues().isMaybeTrue()) { - return strategyFactory.makeMultiValueDimensionProcessor( - effectiveCapabilites, - selectorFactory.makeMultiValueDimensionSelector(dimensionSpec) - ); - } else { - return strategyFactory.makeSingleValueDimensionProcessor( - effectiveCapabilites, - selectorFactory.makeSingleValueDimensionSelector(dimensionSpec) - ); - } - } else { - Preconditions.checkState( - dimensionSpec.getExtractionFn() == null && !dimensionSpec.mustDecorate(), - "Uh oh, was about to try to make a value selector for type[%s] with a dimensionSpec of class[%s] that " - + "requires decoration. Possible bug.", - type, - dimensionSpec.getClass().getName() - ); - - if (type == ValueType.LONG) { - return strategyFactory.makeLongProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else if (type == ValueType.FLOAT) { - return strategyFactory.makeFloatProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else if (type == ValueType.DOUBLE) { - return strategyFactory.makeDoubleProcessor( - effectiveCapabilites, - selectorFactory.makeValueSelector(dimensionSpec.getDimension()) - ); - } else { - throw new ISE("Unsupported type[%s]", effectiveCapabilites.getType()); - } - } - } - @Nullable public static String convertObjectToString(@Nullable Object valObj) { diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java index f50ae1e70b94..554210a45dc4 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java @@ -122,16 +122,25 @@ default boolean isNull() @Nullable default Object defaultGetObject() { - IndexedInts row = getRow(); + return rowToObject(getRow(), this); + } + + /** + * Converts a particular {@link IndexedInts} to an Object in a standard way, assuming each element in the IndexedInts + * is a dictionary ID that can be resolved with the provided selector. + */ + @Nullable + static Object rowToObject(IndexedInts row, DimensionDictionarySelector selector) + { int rowSize = row.size(); if (rowSize == 0) { return null; } else if (rowSize == 1) { - return lookupName(row.get(0)); + return selector.lookupName(row.get(0)); } else { final String[] strings = new String[rowSize]; for (int i = 0; i < rowSize; i++) { - strings[i] = lookupName(row.get(i)); + strings[i] = selector.lookupName(row.get(i)); } return Arrays.asList(strings); } diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java index 6ae1557ea61a..7ebdb81a99c8 100644 --- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java @@ -22,34 +22,59 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; /** * Class that encapsulates knowledge about how to create vector column processors. Used by - * {@link DimensionHandlerUtils#makeVectorProcessor}. + * {@link ColumnProcessors#makeVectorProcessor}. * - * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is - * always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns - * are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align - * with {@link ColumnProcessorFactory}. + * Column processors can be any type "T". The idea is that a ColumnProcessorFactory embodies the logic for wrapping + * and processing selectors of various types, and so enables nice code design, where type-dependent code is not + * sprinkled throughout. + * + * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method, because vector + * column types are always known, so it isn't necessary. * * @see ColumnProcessorFactory the non-vectorized version */ public interface VectorColumnProcessorFactory { + /** + * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value + * per row. + */ T makeSingleValueDimensionProcessor( ColumnCapabilities capabilities, SingleValueDimensionVectorSelector selector ); + /** + * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values + * per row. + */ T makeMultiValueDimensionProcessor( ColumnCapabilities capabilities, MultiValueDimensionVectorSelector selector ); + /** + * Called when {@link ColumnCapabilities#getType()} is FLOAT. + */ T makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + /** + * Called when {@link ColumnCapabilities#getType()} is DOUBLE. + */ T makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + /** + * Called when {@link ColumnCapabilities#getType()} is LONG. + */ T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); + + /** + * Called when {@link ColumnCapabilities#getType()} is COMPLEX. + */ + T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java index 10a176745ee0..424bfe4e85e9 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java @@ -41,9 +41,9 @@ import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IntListUtils; import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; @@ -129,7 +129,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( boundDimFilter.getDimension(), VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java index 8c6d2b0564f0..bba9d552346f 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java @@ -36,15 +36,16 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.Objects; import java.util.Set; /** + * */ public class DimensionPredicateFilter implements Filter { @@ -98,7 +99,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java index 72332c50b7cc..473ef9527ac9 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java @@ -35,9 +35,9 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.data.CloseableIndexed; import org.apache.druid.segment.data.Indexed; @@ -91,7 +91,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java index fbe80341c2f1..54dcef8306cb 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java @@ -29,9 +29,9 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcher; import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -87,7 +87,7 @@ public ValueMatcher makeMatcher(ColumnSelectorFactory factory) @Override public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory) { - return DimensionHandlerUtils.makeVectorProcessor( + return ColumnProcessors.makeVectorProcessor( dimension, VectorValueMatcherColumnProcessorFactory.instance(), factory diff --git a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java index e99d47988920..f33ed7b08384 100644 --- a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java @@ -26,9 +26,13 @@ import javax.annotation.Nullable; /** + * A class that comes from {@link VectorCursor#getColumnSelectorFactory()} and is used to create vector selectors. * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this class. * - * @see org.apache.druid.segment.ColumnSelectorFactory, the non-vectorized version. + * @see org.apache.druid.segment.ColumnSelectorFactory the non-vectorized version. */ public interface VectorColumnSelectorFactory extends ColumnInspector { @@ -48,22 +52,43 @@ default int getMaxVectorSize() } /** - * Returns a string-typed, single-value-per-row column selector. + * Returns a string-typed, single-value-per-row column selector. Should only be called on columns where + * {@link #getColumnCapabilities} indicates they return STRING, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec); /** - * Returns a string-typed, multi-value-per-row column selector. + * Returns a string-typed, multi-value-per-row column selector. Should only be called on columns where + * {@link #getColumnCapabilities} indicates they return STRING. Unlike {@link #makeSingleValueDimensionSelector}, + * this should not be called on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec); /** - * Returns a primitive column selector. + * Returns a primitive column selector. Should only be called on columns where {@link #getColumnCapabilities} + * indicates they return DOUBLE, FLOAT, or LONG, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorValueSelector makeValueSelector(String column); /** - * Returns an object selector, useful for complex columns. + * Returns an object selector. Should only be called on columns where {@link #getColumnCapabilities} indicates that + * they return STRING or COMPLEX, or on nonexistent columns. + * + * If you need to write code that adapts to different input types, you should write a + * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the + * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method. */ VectorObjectSelector makeObjectSelector(String column); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 4a7df063a821..0d6e8b879ef2 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -326,6 +326,21 @@ public Sequence createIndexAndRunQueryOnSegment( return runQueryOnSegments(Collections.singletonList(segmentDir), queryJson); } + public Sequence createIndexAndRunQueryOnSegment( + File inputDataFile, + String parserJson, + String aggregators, + long minTimestamp, + Granularity gran, + int maxRowCount, + Query query + ) throws Exception + { + File segmentDir = tempFolder.newFolder(); + createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount, true); + return runQueryOnSegments(Collections.singletonList(segmentDir), query); + } + public Sequence createIndexAndRunQueryOnSegment( File inputDataFile, String parserJson, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 27b75f37060d..70818e962904 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -8856,6 +8856,36 @@ public void testGroupByLongColumn() TestHelper.assertExpectedObjects(expectedResults, results, "long"); } + @Test + public void testGroupByComplexColumn() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality_uniques", "quality_uniques")) + .setDimFilter(new SelectorDimFilter("quality_uniques", null, null)) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + Assert.assertEquals(Functions.>identity(), query.getLimitSpec().build(query)); + + List expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "quality_uniques", + null, + "rows", + 26L, + "idx", + 12446L + ) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "long"); + } + @Test public void testGroupByLongColumnDescending() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java index 627c2bdc886a..c4a3e903eaf7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java @@ -29,7 +29,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator; -import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; @@ -72,7 +72,7 @@ public void testCreateOneGrouperAndCloseItWhenClose() throws IOException ); final List dimensions = query.getDimensions().stream().map( dimensionSpec -> - DimensionHandlerUtils.makeVectorProcessor( + ColumnProcessors.makeVectorProcessor( dimensionSpec, GroupByVectorColumnProcessorFactory.instance(), cursor.getColumnSelectorFactory()