From 57e8512235c32165bd2fe3e38389c547c4746a5d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 17 Jun 2024 23:08:31 +0530 Subject: [PATCH 01/17] init --- .../query/groupby/epinephelinae/Grouper.java | 6 + .../epinephelinae/RowBasedGrouperHelper.java | 151 +++++++++++++-- .../epinephelinae/RowBasedKeySerdeHelper.java | 4 + .../epinephelinae/SpillingGrouper.java | 2 +- .../aggregation/AggregationTestHelper.java | 8 +- .../query/groupby/ComplexGroupByTest.java | 172 ++++++++++++++++++ 6 files changed, 320 insertions(+), 23 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java index 591624f1ab80..ad2eb1ffdc41 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java @@ -19,6 +19,7 @@ package org.apache.druid.query.groupby.epinephelinae; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -232,6 +233,11 @@ interface KeySerde */ BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets); + default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper) + { + return spillMapper; + } + /** * Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer} * and {@link #bufferComparator()} may no longer work properly on previously-serialized keys. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 491c28d41427..355cdd25a113 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -19,9 +19,14 @@ package org.apache.druid.query.groupby.epinephelinae; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.primitives.Ints; @@ -84,6 +89,7 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -309,7 +315,8 @@ public static Pair, Accumulator combining, includeTimestamp, columnSelectorFactory, - valueTypes + valueTypes, + spillMapper ); final Predicate rowPredicate; @@ -509,7 +516,8 @@ private static ValueExtractFunction makeValueExtractFunction( final boolean combining, final boolean includeTimestamp, final ColumnSelectorFactory columnSelectorFactory, - final List valueTypes + final List valueTypes, + final ObjectMapper spillMapper ) { final TimestampExtractFunction timestampExtractFn = includeTimestamp ? @@ -666,22 +674,6 @@ public static class RowBasedKey this.key = key; } - @JsonCreator - public static RowBasedKey fromJsonArray(final Object[] key) - { - // Type info is lost during serde: - // Floats may be deserialized as doubles, Longs may be deserialized as integers, convert them back - for (int i = 0; i < key.length; i++) { - if (key[i] instanceof Integer) { - key[i] = ((Integer) key[i]).longValue(); - } else if (key[i] instanceof Double) { - key[i] = ((Double) key[i]).floatValue(); - } - } - - return new RowBasedKey(key); - } - @JsonValue public Object[] getKey() { @@ -1371,6 +1363,64 @@ public Grouper.BufferComparator bufferComparatorWithAggregators( ); } + @Override + public ObjectMapper decorateObjectMapper(ObjectMapper spillMapper) + { + + final JsonDeserializer deserializer = new JsonDeserializer() + { + @Override + public RowBasedKey deserialize( + JsonParser jp, + DeserializationContext deserializationContext + ) throws IOException + { + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("expected start token"); + } + jp.nextToken(); + ObjectCodec codec = jp.getCodec(); + + int i = 0; + Object[] objects = new Object[serdeHelpers.length]; + while (jp.currentToken() != JsonToken.END_ARRAY) { + if (i > serdeHelpers.length) { + throw DruidException.defensive("not enough serde helpers"); + } + + if (serdeHelpers[i].getComplexClazz() == null) { + objects[i] = codec.readValue(jp, Object.class); + if (objects[i] instanceof Integer) { + objects[i] = ((Integer) objects[i]).longValue(); + } else if (objects[i] instanceof Double) { + objects[i] = ((Double) objects[i]).floatValue(); + } + + } else { + objects[i] = codec.readValue(jp, serdeHelpers[i].getComplexClazz()); + } + + ++i; + jp.nextToken(); + } + + return new RowBasedKey(objects); + } + }; + + class SpillModule extends SimpleModule + { + public SpillModule() + { + addDeserializer(RowBasedKey.class, deserializer); + } + } + + final ObjectMapper newObjectMapper = spillMapper.copy(); + newObjectMapper.registerModule(new SpillModule()); + return newObjectMapper; + } + @Override public void reset() { @@ -1588,6 +1638,8 @@ private class GenericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValu { final BufferComparator bufferComparator; final String columnTypeName; + @Nullable + final Class clazz; final List dictionary; final Object2IntMap reverseDictionary; @@ -1613,6 +1665,11 @@ public GenericRowBasedKeySerdeHelper( dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)), dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)) ); + if (columnType.is(ValueType.COMPLEX)) { + clazz = columnType.getNullableStrategy().getClazz(); + } else { + clazz = null; + } } // Asserts that we don't entertain any complex types without a typename, to prevent intermixing dictionaries of @@ -1645,6 +1702,13 @@ public Object2IntMap getReverseDictionary() { return reverseDictionary; } + + @Nullable + @Override + public Class getComplexClazz() + { + return clazz; + } } @@ -1726,6 +1790,13 @@ public Object2IntMap getReverseDictionary() { return reverseDictionary; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } private class ArrayStringRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper @@ -1770,6 +1841,13 @@ public Object2IntMap getReverseDictionary() { return reverseStringArrayDictionary; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -1819,6 +1897,13 @@ public BufferComparator getBufferComparator() { return bufferComparator; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } private class DynamicDictionaryStringRowBasedKeySerdeHelper extends AbstractStringRowBasedKeySerdeHelper @@ -1937,6 +2022,13 @@ public BufferComparator getBufferComparator() { return bufferComparator; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -1982,6 +2074,13 @@ public BufferComparator getBufferComparator() { return bufferComparator; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } private class DoubleRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper @@ -2027,6 +2126,13 @@ public BufferComparator getBufferComparator() { return bufferComparator; } + + @Nullable + @Override + public Class getComplexClazz() + { + return null; + } } // This class is only used when SQL compatible null handling is enabled. @@ -2082,6 +2188,13 @@ public BufferComparator getBufferComparator() { return comparator; } + + @Nullable + @Override + public Class getComplexClazz() + { + return delegate.getComplexClazz(); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java index 1cb29d23bc06..a37d5b17df23 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java @@ -23,6 +23,7 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; import java.nio.ByteBuffer; interface RowBasedKeySerdeHelper @@ -65,4 +66,7 @@ interface RowBasedKeySerdeHelper * Return a {@link BufferComparator} to compare keys stored in ByteBuffer. */ BufferComparator getBufferComparator(); + + @Nullable + Class getComplexClazz(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 4e9b96102a16..d8a7760c11de 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -152,7 +152,7 @@ public SpillingGrouper( } this.aggregatorFactories = aggregatorFactories; this.temporaryStorage = temporaryStorage; - this.spillMapper = spillMapper; + this.spillMapper = keySerde.decorateObjectMapper(spillMapper); this.spillingAllowed = spillingAllowed; this.sortHasNonGroupingFields = sortHasNonGroupingFields; } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 526a62c813fb..2ad9f90148a8 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -766,7 +766,7 @@ public Object accumulate(Object accumulated, Object in) String resultStr = mapper.writer().writeValueAsString(yielder); List resultRows = Lists.transform( - readQueryResultArrayFromString(resultStr), + readQueryResultArrayFromString(resultStr, queryPlus.getQuery()), toolChest.makePreComputeManipulatorFn( queryPlus.getQuery(), MetricManipulatorFns.deserializing() @@ -798,11 +798,13 @@ public Object accumulate(Object accumulated, Object in) }; } - private List readQueryResultArrayFromString(String str) throws Exception + private List readQueryResultArrayFromString(String str, Query query) throws Exception { List result = new ArrayList(); - JsonParser jp = mapper.getFactory().createParser(str); + ObjectMapper decoratedMapper = toolChest.decorateObjectMapper(mapper, query); + + JsonParser jp = decoratedMapper.getFactory().createParser(str); if (jp.nextToken() != JsonToken.START_ARRAY) { throw new IAE("not an array [%s]", str); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java b/processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java new file mode 100644 index 000000000000..da990fda2057 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.RowBasedSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.timeline.SegmentId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class ComplexGroupByTest +{ + + private final QueryContexts.Vectorize vectorize; + private final AggregationTestHelper helper; + private final List segments; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + + public ComplexGroupByTest(GroupByQueryConfig config, String vectorize) + { + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); + this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Collections.emptyList(), + config, + tempFolder + ); + Sequence rows = Sequences.simple( + ImmutableList.of( + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "def")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "ghi")}, + new Object[]{new SerializablePairLongString(1L, "def")}, + new Object[]{new SerializablePairLongString(1L, "abc")}, + new Object[]{new SerializablePairLongString(1L, "pqr")}, + new Object[]{new SerializablePairLongString(1L, "xyz")}, + new Object[]{new SerializablePairLongString(1L, "foo")}, + new Object[]{new SerializablePairLongString(1L, "bar")} + ) + ); + RowSignature rowSignature = RowSignature.builder() + .add( + "pair", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + .build(); + + this.segments = Collections.singletonList( + new RowBasedSegment<>( + SegmentId.dummy("dummy"), + rows, + columnName -> { + final int columnNumber = rowSignature.indexOf(columnName); + return row -> columnNumber >= 0 ? row[columnNumber] : null; + }, + rowSignature + ) + ); + } + + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { + for (String vectorize : new String[]{"false", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } + } + return constructors; + } + + public Map getContext() + { + return ImmutableMap.of( + QueryContexts.VECTORIZE_KEY, vectorize.toString(), + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "true" + ); + } + + @Test + public void testGroupByOnPairClass() + { + if (vectorize == QueryContexts.Vectorize.FORCE) { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "Cannot vectorize!" + ); + } + + GroupByQuery groupQuery = GroupByQuery.builder() + .setDataSource("test_datasource") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .setDimensions(new DefaultDimensionSpec( + "pair", + "pair", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + )) + .setAggregatorSpecs(new CountAggregatorFactory("count")) + .setContext(getContext()) + .build(); + + List resultRows = helper.runQueryOnSegmentsObjs(segments, groupQuery).toList(); + + Assert.assertArrayEquals( + new ResultRow[]{ + ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L), + ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "def"), 2L), + ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L) + }, + resultRows.toArray() + ); + + + } + +} From 12f1d08ca9ea99cef6df380d800f639bcb7f7c1b Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 17 Jun 2024 23:49:08 +0530 Subject: [PATCH 02/17] javadocs --- .../druid/query/groupby/epinephelinae/Grouper.java | 5 +++++ .../epinephelinae/RowBasedGrouperHelper.java | 14 ++++++-------- .../epinephelinae/RowBasedKeySerdeHelper.java | 6 ++++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java index ad2eb1ffdc41..0f3faedb707c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java @@ -233,6 +233,11 @@ interface KeySerde */ BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets); + /** + * Decorates the object mapper enabling it to read and write query results' grouping keys. It is used by the + * {@link SpillingGrouper} to preserve the types of the dimensions after serializing and deserializing them on the + * spilled files. + */ default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper) { return spillMapper; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 355cdd25a113..a07cde324901 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -315,8 +315,7 @@ public static Pair, Accumulator combining, includeTimestamp, columnSelectorFactory, - valueTypes, - spillMapper + valueTypes ); final Predicate rowPredicate; @@ -516,8 +515,7 @@ private static ValueExtractFunction makeValueExtractFunction( final boolean combining, final boolean includeTimestamp, final ColumnSelectorFactory columnSelectorFactory, - final List valueTypes, - final ObjectMapper spillMapper + final List valueTypes ) { final TimestampExtractFunction timestampExtractFn = includeTimestamp ? @@ -1639,7 +1637,7 @@ private class GenericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValu final BufferComparator bufferComparator; final String columnTypeName; @Nullable - final Class clazz; + final Class complexClazz; final List dictionary; final Object2IntMap reverseDictionary; @@ -1666,9 +1664,9 @@ public GenericRowBasedKeySerdeHelper( dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)) ); if (columnType.is(ValueType.COMPLEX)) { - clazz = columnType.getNullableStrategy().getClazz(); + complexClazz = columnType.getNullableStrategy().getClazz(); } else { - clazz = null; + complexClazz = null; } } @@ -1707,7 +1705,7 @@ public Object2IntMap getReverseDictionary() @Override public Class getComplexClazz() { - return clazz; + return complexClazz; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java index a37d5b17df23..46395dc798c9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java @@ -67,6 +67,12 @@ interface RowBasedKeySerdeHelper */ BufferComparator getBufferComparator(); + /** + * If the key(dimension) is complex, returns the expected class of the objects. The class is used to deserialize the + * objects correctly from the spilled files. Returns null if the dimension is not complex because all other dimensions + * work correctly when deserialized as generic java objects without type information. + */ + @SuppressWarnings("rawtypes") @Nullable Class getComplexClazz(); } From 7fd9fa8bcd148632cf5beccd057f8e9ea0ab7a2a Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 17 Jun 2024 23:51:16 +0530 Subject: [PATCH 03/17] codeql --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index a07cde324901..955930a6a0f5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1382,7 +1382,7 @@ public RowBasedKey deserialize( int i = 0; Object[] objects = new Object[serdeHelpers.length]; while (jp.currentToken() != JsonToken.END_ARRAY) { - if (i > serdeHelpers.length) { + if (i >= serdeHelpers.length) { throw DruidException.defensive("not enough serde helpers"); } From f1fc86125bef6508143e27ee803c830678809ca5 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 17 Jun 2024 23:55:31 +0530 Subject: [PATCH 04/17] test rename, codeql --- ... => ComplexDimensionGroupByQueryTest.java} | 56 ++++++++----------- 1 file changed, 24 insertions(+), 32 deletions(-) rename processing/src/test/java/org/apache/druid/query/groupby/{ComplexGroupByTest.java => ComplexDimensionGroupByQueryTest.java} (84%) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java similarity index 84% rename from processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java rename to processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java index da990fda2057..bc1ecbb0ddc8 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/ComplexGroupByTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java @@ -39,7 +39,6 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -51,21 +50,16 @@ import java.util.Map; @RunWith(Parameterized.class) -public class ComplexGroupByTest +public class ComplexDimensionGroupByQueryTest { - private final QueryContexts.Vectorize vectorize; private final AggregationTestHelper helper; private final List segments; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - - public ComplexGroupByTest(GroupByQueryConfig config, String vectorize) + public ComplexDimensionGroupByQueryTest(GroupByQueryConfig config, String vectorize) { this.vectorize = QueryContexts.Vectorize.fromString(vectorize); this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( @@ -131,13 +125,6 @@ public Map getContext() @Test public void testGroupByOnPairClass() { - if (vectorize == QueryContexts.Vectorize.FORCE) { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "Cannot vectorize!" - ); - } - GroupByQuery groupQuery = GroupByQuery.builder() .setDataSource("test_datasource") .setGranularity(Granularities.ALL) @@ -151,22 +138,27 @@ public void testGroupByOnPairClass() .setContext(getContext()) .build(); - List resultRows = helper.runQueryOnSegmentsObjs(segments, groupQuery).toList(); - - Assert.assertArrayEquals( - new ResultRow[]{ - ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L), - ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L), - ResultRow.of(new SerializablePairLongString(1L, "def"), 2L), - ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L), - ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L), - ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L), - ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L) - }, - resultRows.toArray() - ); - - + if (vectorize == QueryContexts.Vectorize.FORCE) { + // Cannot vectorize group by on complex dimension + Assert.assertThrows( + RuntimeException.class, + () -> helper.runQueryOnSegmentsObjs(segments, groupQuery).toList() + ); + } else { + List resultRows = helper.runQueryOnSegmentsObjs(segments, groupQuery).toList(); + + Assert.assertArrayEquals( + new ResultRow[]{ + ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L), + ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "def"), 2L), + ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L), + ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L) + }, + resultRows.toArray() + ); + } } - } From 5590ad67687abf9f4e13c3595263bbc2cd074f59 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 11:01:33 +0530 Subject: [PATCH 05/17] take timestamps into account --- .../epinephelinae/RowBasedGrouperHelper.java | 39 ++++++++++++------- .../ObjectStrategyComplexTypeStrategy.java | 6 +-- .../query/groupby/GroupByQueryRunnerTest.java | 5 ++- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 955930a6a0f5..dab6b583aa95 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1374,31 +1374,40 @@ public RowBasedKey deserialize( ) throws IOException { if (!jp.isExpectedStartArrayToken()) { - throw DruidException.defensive("expected start token"); + throw DruidException.defensive("Expected array start token, received [%s]", jp.getCurrentToken()); } + jp.nextToken(); ObjectCodec codec = jp.getCodec(); - int i = 0; - Object[] objects = new Object[serdeHelpers.length]; + int timestampAdjustment = includeTimestamp ? 1 : 0; + int dimsToRead = timestampAdjustment + serdeHelpers.length; + int dimsReadSoFar = 0; + Object[] objects = new Object[dimsToRead]; while (jp.currentToken() != JsonToken.END_ARRAY) { - if (i >= serdeHelpers.length) { - throw DruidException.defensive("not enough serde helpers"); + if (dimsReadSoFar >= dimsToRead) { + throw DruidException.defensive("More dimensions encountered than expected [%d]", dimsToRead); } - if (serdeHelpers[i].getComplexClazz() == null) { - objects[i] = codec.readValue(jp, Object.class); - if (objects[i] instanceof Integer) { - objects[i] = ((Integer) objects[i]).longValue(); - } else if (objects[i] instanceof Double) { - objects[i] = ((Double) objects[i]).floatValue(); - } - + if (includeTimestamp && dimsReadSoFar == 0) { + // Read the timestamp + objects[dimsReadSoFar] = codec.readValue(jp, Long.class); } else { - objects[i] = codec.readValue(jp, serdeHelpers[i].getComplexClazz()); + // Read the dimension + if (serdeHelpers[dimsReadSoFar - timestampAdjustment].getComplexClazz() == null) { + objects[dimsReadSoFar] = codec.readValue(jp, Object.class); + if (objects[dimsReadSoFar] instanceof Integer) { + objects[dimsReadSoFar] = ((Integer) objects[dimsReadSoFar]).longValue(); + } else if (objects[dimsReadSoFar] instanceof Double) { + objects[dimsReadSoFar] = ((Double) objects[dimsReadSoFar]).floatValue(); + } + } else { + objects[dimsReadSoFar] = + codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getComplexClazz()); + } } - ++i; + ++dimsReadSoFar; jp.nextToken(); } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index b274e55282ea..f80a1cdcf8dc 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -123,7 +123,7 @@ public boolean groupable() public int hashCode(T o) { if (hashStrategy == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return hashStrategy.hashCode(o); } @@ -132,7 +132,7 @@ public int hashCode(T o) public boolean equals(T a, T b) { if (hashStrategy == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return hashStrategy.equals(a, b); } @@ -141,7 +141,7 @@ public boolean equals(T a, T b) public Class getClazz() { if (clazz == null) { - throw DruidException.defensive("hashStrategy not provided"); + throw DruidException.defensive("Type [%s] is not groupable", typeSignature.asTypeString()); } return clazz; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 3613246fef65..09bd75287c42 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.Rows; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; @@ -9931,7 +9932,6 @@ public void testGroupByLongColumn() @Test public void testGroupByComplexColumn() { - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) @@ -9945,7 +9945,8 @@ public void testGroupByComplexColumn() .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .build(); - expectedException.expect(RuntimeException.class); + expectedException.expect(DruidException.class); + expectedException.expectMessage("Type [COMPLEX] is not groupable"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } From 94b38a83d2a447162045bc85c6566d704137bde1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 13:59:39 +0530 Subject: [PATCH 06/17] generic --- .../epinephelinae/RowBasedGrouperHelper.java | 16 ++++++++-------- .../epinephelinae/RowBasedKeySerdeHelper.java | 3 +-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index dab6b583aa95..6e364caf065c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1712,7 +1712,7 @@ public Object2IntMap getReverseDictionary() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return complexClazz; } @@ -1800,7 +1800,7 @@ public Object2IntMap getReverseDictionary() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -1851,7 +1851,7 @@ public Object2IntMap getReverseDictionary() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -1907,7 +1907,7 @@ public BufferComparator getBufferComparator() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -2032,7 +2032,7 @@ public BufferComparator getBufferComparator() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -2084,7 +2084,7 @@ public BufferComparator getBufferComparator() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -2136,7 +2136,7 @@ public BufferComparator getBufferComparator() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return null; } @@ -2198,7 +2198,7 @@ public BufferComparator getBufferComparator() @Nullable @Override - public Class getComplexClazz() + public Class getComplexClazz() { return delegate.getComplexClazz(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java index 46395dc798c9..c0faa71aab34 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java @@ -72,7 +72,6 @@ interface RowBasedKeySerdeHelper * objects correctly from the spilled files. Returns null if the dimension is not complex because all other dimensions * work correctly when deserialized as generic java objects without type information. */ - @SuppressWarnings("rawtypes") @Nullable - Class getComplexClazz(); + Class getComplexClazz(); } From f647c3f466b5367555b58bab178a3bfdbf7c6498 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 14:08:28 +0530 Subject: [PATCH 07/17] codeql, add defensive check --- .../epinephelinae/RowBasedGrouperHelper.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 6e364caf065c..8ed6b0385243 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1376,14 +1376,14 @@ public RowBasedKey deserialize( if (!jp.isExpectedStartArrayToken()) { throw DruidException.defensive("Expected array start token, received [%s]", jp.getCurrentToken()); } - jp.nextToken(); - ObjectCodec codec = jp.getCodec(); - int timestampAdjustment = includeTimestamp ? 1 : 0; - int dimsToRead = timestampAdjustment + serdeHelpers.length; + final ObjectCodec codec = jp.getCodec(); + final int timestampAdjustment = includeTimestamp ? 1 : 0; + final int dimsToRead = timestampAdjustment + serdeHelpers.length; int dimsReadSoFar = 0; - Object[] objects = new Object[dimsToRead]; + final Object[] objects = new Object[dimsToRead]; + while (jp.currentToken() != JsonToken.END_ARRAY) { if (dimsReadSoFar >= dimsToRead) { throw DruidException.defensive("More dimensions encountered than expected [%d]", dimsToRead); @@ -1393,6 +1393,10 @@ public RowBasedKey deserialize( // Read the timestamp objects[dimsReadSoFar] = codec.readValue(jp, Long.class); } else { + DruidException.conditionalDefensive( + dimsReadSoFar - timestampAdjustment < serdeHelpers.length, + "Insufficient serde helpers present" + ); // Read the dimension if (serdeHelpers[dimsReadSoFar - timestampAdjustment].getComplexClazz() == null) { objects[dimsReadSoFar] = codec.readValue(jp, Object.class); From 1c8561c3ea00936ec226137c66355a9499bea6d4 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 24 Jun 2024 11:29:49 +0530 Subject: [PATCH 08/17] review --- .../epinephelinae/RowBasedGrouperHelper.java | 59 +++++++------------ .../epinephelinae/RowBasedKeySerdeHelper.java | 8 +-- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 8ed6b0385243..d5ddff83ef27 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1398,17 +1398,9 @@ public RowBasedKey deserialize( "Insufficient serde helpers present" ); // Read the dimension - if (serdeHelpers[dimsReadSoFar - timestampAdjustment].getComplexClazz() == null) { - objects[dimsReadSoFar] = codec.readValue(jp, Object.class); - if (objects[dimsReadSoFar] instanceof Integer) { - objects[dimsReadSoFar] = ((Integer) objects[dimsReadSoFar]).longValue(); - } else if (objects[dimsReadSoFar] instanceof Double) { - objects[dimsReadSoFar] = ((Double) objects[dimsReadSoFar]).floatValue(); - } - } else { - objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getComplexClazz()); - } + serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz(); + objects[dimsReadSoFar] = + codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); } ++dimsReadSoFar; @@ -1649,8 +1641,7 @@ private class GenericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValu { final BufferComparator bufferComparator; final String columnTypeName; - @Nullable - final Class complexClazz; + final Class clazz; final List dictionary; final Object2IntMap reverseDictionary; @@ -1677,9 +1668,9 @@ public GenericRowBasedKeySerdeHelper( dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)) ); if (columnType.is(ValueType.COMPLEX)) { - complexClazz = columnType.getNullableStrategy().getClazz(); + clazz = columnType.getNullableStrategy().getClazz(); } else { - complexClazz = null; + clazz = Object.class; } } @@ -1714,11 +1705,10 @@ public Object2IntMap getReverseDictionary() return reverseDictionary; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return complexClazz; + return clazz; } } @@ -1802,9 +1792,8 @@ public Object2IntMap getReverseDictionary() return reverseDictionary; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { return null; } @@ -1853,11 +1842,10 @@ public Object2IntMap getReverseDictionary() return reverseStringArrayDictionary; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return null; + return Object.class; } } @@ -1909,11 +1897,10 @@ public BufferComparator getBufferComparator() return bufferComparator; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return null; + return Object.class; } } @@ -2034,11 +2021,10 @@ public BufferComparator getBufferComparator() return bufferComparator; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return null; + return Long.class; } } @@ -2086,11 +2072,10 @@ public BufferComparator getBufferComparator() return bufferComparator; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return null; + return Float.class; } } @@ -2138,11 +2123,10 @@ public BufferComparator getBufferComparator() return bufferComparator; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return null; + return Double.class; } } @@ -2200,11 +2184,10 @@ public BufferComparator getBufferComparator() return comparator; } - @Nullable @Override - public Class getComplexClazz() + public Class getClazz() { - return delegate.getComplexClazz(); + return delegate.getClazz(); } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java index c0faa71aab34..71372ca238ba 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java @@ -23,7 +23,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; import javax.annotation.CheckReturnValue; -import javax.annotation.Nullable; import java.nio.ByteBuffer; interface RowBasedKeySerdeHelper @@ -68,10 +67,7 @@ interface RowBasedKeySerdeHelper BufferComparator getBufferComparator(); /** - * If the key(dimension) is complex, returns the expected class of the objects. The class is used to deserialize the - * objects correctly from the spilled files. Returns null if the dimension is not complex because all other dimensions - * work correctly when deserialized as generic java objects without type information. + * Returns the expected class of the key which used to deserialize the objects correctly from the spilled files. */ - @Nullable - Class getComplexClazz(); + Class getClazz(); } From 3dc54d3df438cecca11b2b9748ffa66b0fe36a8e Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 24 Jun 2024 22:31:48 +0530 Subject: [PATCH 09/17] array numeric left --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index d5ddff83ef27..a81c520e4f0b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1641,7 +1641,7 @@ private class GenericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValu { final BufferComparator bufferComparator; final String columnTypeName; - final Class clazz; + final Class clazz; final List dictionary; final Object2IntMap reverseDictionary; @@ -1795,7 +1795,7 @@ public Object2IntMap getReverseDictionary() @Override public Class getClazz() { - return null; + return Object.class; } } From 33d55adceb19896b3d60880e947bac1dd5020f85 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 25 Jun 2024 10:12:08 +0530 Subject: [PATCH 10/17] test changes for caching --- .../apache/druid/query/QueryToolChest.java | 6 ++ .../groupby/GroupByQueryQueryToolChest.java | 78 ++++++++++++------- .../GroupByQueryQueryToolChestTest.java | 46 +++++++++++ 3 files changed, 101 insertions(+), 29 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index b0678f247c9d..6eaf497b23dd 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -266,6 +266,12 @@ public CacheStrategy getCacheStrategy(QueryType qu return null; } + @Nullable + public CacheStrategy getCacheStrategy(QueryType query, ObjectMapper mapper) + { + return getCacheStrategy(query); + } + /** * Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to * mergeResults(). diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 47064fefbe60..1321bbf28d2b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -80,6 +80,7 @@ import org.apache.druid.segment.column.ValueType; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -472,7 +473,7 @@ public void serialize( // Deserializer that can deserialize either array- or map-based rows. final JsonDeserializer deserializer = new JsonDeserializer() { - final Class[] dimensionClasses = createDimensionClasses(); + final Class[] dimensionClasses = createDimensionClasses(query); boolean containsComplexDimensions = query.getDimensions() .stream() .anyMatch( @@ -526,28 +527,6 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c } } - private Class[] createDimensionClasses() - { - final List queryDimensions = query.getDimensions(); - final Class[] classes = new Class[queryDimensions.size()]; - for (int i = 0; i < queryDimensions.size(); ++i) { - final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); - if (dimensionOutputType.is(ValueType.COMPLEX)) { - NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); - if (!nullableTypeStrategy.groupable()) { - throw DruidException.defensive( - "Ungroupable dimension [%s] with type [%s] found in the query.", - queryDimensions.get(i).getDimension(), - dimensionOutputType - ); - } - classes[i] = nullableTypeStrategy.getClazz(); - } else { - classes[i] = Object.class; - } - } - return classes; - } }; @@ -598,14 +577,25 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r ); } + @Nullable @Override - public CacheStrategy getCacheStrategy(final GroupByQuery query) + public CacheStrategy getCacheStrategy(GroupByQuery query) + { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy getCacheStrategy( + final GroupByQuery query, + @Nullable final ObjectMapper mapper + ) { return new CacheStrategy() { private static final byte CACHE_STRATEGY_VERSION = 0x1; private final List aggs = query.getAggregatorSpecs(); private final List dims = query.getDimensions(); + private final Class[] dimensionClasses = createDimensionClasses(query); @Override public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment) @@ -727,13 +717,20 @@ public ResultRow apply(Object input) int dimPos = 0; while (dimsIter.hasNext() && results.hasNext()) { final DimensionSpec dimensionSpec = dimsIter.next(); + final Object dimensionObject = results.next(); + final Object dimensionObjectCasted; // Must convert generic Jackson-deserialized type into the proper type. - resultRow.set( - dimensionStart + dimPos, - DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType()) - ); - + if (dimensionSpec.getOutputType().is(ValueType.COMPLEX)) { + DruidException.conditionalDefensive( + mapper != null, + "Cannot deserialize complex dimension from if object mapper is not provided" + ); + dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]); + } else { + dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(dimensionObject, dimensionSpec.getOutputType()); + } + resultRow.set(dimensionStart + dimPos, dimensionObjectCasted); dimPos++; } @@ -861,4 +858,27 @@ private static BitSet extractionsToRewrite(GroupByQuery query) return retVal; } + + private static Class[] createDimensionClasses(final GroupByQuery query) + { + final List queryDimensions = query.getDimensions(); + final Class[] classes = new Class[queryDimensions.size()]; + for (int i = 0; i < queryDimensions.size(); ++i) { + final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); + if (dimensionOutputType.is(ValueType.COMPLEX)) { + NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + queryDimensions.get(i).getDimension(), + dimensionOutputType + ); + } + classes[i] = nullableTypeStrategy.getClazz(); + } else { + classes[i] = Object.class; + } + } + return classes; + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index f43bbce9d978..7786d9d77474 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.groupby; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Supplier; @@ -33,6 +34,7 @@ import org.apache.druid.collections.StupidPool; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; +import org.apache.druid.jackson.AggregatorsModule; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -97,6 +99,8 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest public static void setUpClass() { NullHandling.initializeForTests(); + //noinspection ResultOfObjectAllocationIgnored + new AggregatorsModule(); } @Test @@ -512,6 +516,48 @@ public void testCacheStrategy() throws Exception doTestCacheStrategy(ColumnType.LONG, 2L); } + @Test + public void testComplexDimensionCacheStrategy() throws IOException + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(ImmutableList.of( + new DefaultDimensionSpec( + "test", + "test", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + )) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + + CacheStrategy strategy = + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, objectMapper); + + // test timestamps that result in integer size millis + final ResultRow result1 = ResultRow.of( + 123L, + new SerializablePairLongString(123L, "abc"), + 1 + ); + + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); + + Object fromCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedValue), + strategy.getCacheObjectClazz() + ); + + ResultRow fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + } + @Test public void testMultiColumnCacheStrategy() throws Exception { From c57c5ffd5a8ff9d7bf7c387f3b5d6e716f319437 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 26 Jun 2024 17:54:33 +0530 Subject: [PATCH 11/17] modifications --- .../apache/druid/query/QueryToolChest.java | 19 ++++--- .../DataSourceQueryQueryToolChest.java | 8 +++ .../groupby/GroupByQueryQueryToolChest.java | 2 - .../SegmentMetadataQueryQueryToolChest.java | 11 ++++ .../search/SearchQueryQueryToolChest.java | 10 ++++ .../TimeBoundaryQueryQueryToolChest.java | 12 +++++ .../TimeseriesQueryQueryToolChest.java | 12 +++++ .../query/topn/TopNQueryQueryToolChest.java | 13 ++++- .../GroupByQueryQueryToolChestTest.java | 51 +++++++++++-------- .../druid/client/CachingClusteredClient.java | 2 +- .../druid/client/CachingQueryRunner.java | 2 +- .../query/ResultLevelCachingQueryRunner.java | 2 +- .../druid/client/CachingQueryRunnerTest.java | 15 +++--- 13 files changed, 119 insertions(+), 40 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 6eaf497b23dd..aad83275914a 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -252,13 +252,7 @@ public Function makePostComputeManipulatorFn(QueryType q public abstract TypeReference getResultTypeReference(); /** - * Returns a CacheStrategy to be used to load data into the cache and remove it from the cache. - *

- * This is optional. If it returns null, caching is effectively disabled for the query. * - * @param query The query whose results might be cached - * @param The type of object that will be stored in the cache - * @return A CacheStrategy that can be used to populate and read from the Cache */ @Nullable public CacheStrategy getCacheStrategy(QueryType query) @@ -266,8 +260,19 @@ public CacheStrategy getCacheStrategy(QueryType qu return null; } + /** + * Returns a CacheStrategy to be used to load data into the cache and remove it from the cache. + *

+ * This is optional. If it returns null, caching is effectively disabled for the query. + * + * @param query The query whose results might be cached + * @param mapper Object mapper to convert the deserialized generic java objectsto desired types. It can be nullable + * to preserve backward compatibility. + * @param The type of object that will be stored in the cache + * @return A CacheStrategy that can be used to populate and read from the Cache + */ @Nullable - public CacheStrategy getCacheStrategy(QueryType query, ObjectMapper mapper) + public CacheStrategy getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper) { return getCacheStrategy(query); } diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index dbe8922f2e9b..21fb5c53afcc 100644 --- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.datasourcemetadata; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.inject.Inject; @@ -38,6 +39,7 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.timeline.LogicalSegment; +import javax.annotation.Nullable; import java.util.List; import java.util.stream.Collectors; @@ -119,4 +121,10 @@ public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query) { return null; } + + @Override + public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query, @Nullable ObjectMapper mapper) + { + return null; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 1321bbf28d2b..bb0b9ff43ca0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -526,8 +526,6 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c return ResultRow.of(objectArray); } } - - }; class GroupByResultRowModule extends SimpleModule diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 912ecb1ac322..fd8d7e7009c9 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.metadata; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -62,6 +63,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -184,6 +186,15 @@ public TypeReference getResultTypeReference() @Override public CacheStrategy getCacheStrategy(final SegmentMetadataQuery query) + { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy getCacheStrategy( + final SegmentMetadataQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy() { diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index b390cd83a58d..c15e1d0d99c4 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.search; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -124,6 +125,15 @@ public TypeReference> getResultTypeReference() @Override public CacheStrategy, Object, SearchQuery> getCacheStrategy(final SearchQuery query) + { + return getCacheStrategy(query, null); + } + + @Override + public CacheStrategy, Object, SearchQuery> getCacheStrategy( + final SearchQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, SearchQuery>() diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 9087dd26a885..eab5e0f5abcf 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.timeboundary; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -47,6 +48,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.LogicalSegment; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; @@ -163,6 +165,16 @@ public TypeReference> getResultTypeReference() @Override public CacheStrategy, Object, TimeBoundaryQuery> getCacheStrategy(final TimeBoundaryQuery query) + { + return getCacheStrategy(query, null); + } + + + @Override + public CacheStrategy, Object, TimeBoundaryQuery> getCacheStrategy( + final TimeBoundaryQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TimeBoundaryQuery>() { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 71d36bb9bbed..a449d0d59f1e 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.timeseries; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -66,6 +67,7 @@ import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -277,6 +279,16 @@ public TypeReference> getResultTypeReference() @Override public CacheStrategy, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query) + { + return getCacheStrategy(query, null); + } + + + @Override + public CacheStrategy, Object, TimeseriesQuery> getCacheStrategy( + final TimeseriesQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TimeseriesQuery>() { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index b850114f3bcf..87398bbca353 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.topn; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -65,6 +66,7 @@ import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -269,9 +271,18 @@ public TypeReference> getResultTypeReference() return TYPE_REFERENCE; } + @Nullable + @Override + public CacheStrategy, Object, TopNQuery> getCacheStrategy(TopNQuery query) + { + return getCacheStrategy(query, null); + } @Override - public CacheStrategy, Object, TopNQuery> getCacheStrategy(final TopNQuery query) + public CacheStrategy, Object, TopNQuery> getCacheStrategy( + final TopNQuery query, + @Nullable final ObjectMapper objectMapper + ) { return new CacheStrategy, Object, TopNQuery>() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 7786d9d77474..0c196d95716e 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -19,7 +19,6 @@ package org.apache.druid.query.groupby; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Supplier; @@ -134,11 +133,13 @@ public void testResultLevelCacheKeyWithPostAggregate() .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -194,11 +195,12 @@ public void testResultLevelCacheKeyWithLimitSpec() ) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -256,11 +258,12 @@ public void testResultLevelCacheKeyWithHavingSpec() .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.UNIQUE_METRIC, 10)) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -340,11 +343,12 @@ public void testResultLevelCacheKeyWithAndHavingSpec() .setHavingSpec(andHavingSpec2) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -431,11 +435,12 @@ public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec() .setHavingSpec(havingSpec2) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -494,11 +499,12 @@ public void testResultLevelCacheKeyWithSubTotalsSpec() )) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -584,8 +590,9 @@ public void testMultiColumnCacheStrategy() throws Exception .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); CacheStrategy strategy = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); // test timestamps that result in integer size millis final ResultRow result1 = ResultRow.of( @@ -1100,8 +1107,9 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); CacheStrategy strategy = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); // test timestamps that result in integer size millis final ResultRow result1 = ResultRow.of( @@ -1193,11 +1201,12 @@ public void testQueryCacheKeyWithLimitSpec() .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals( @@ -1229,11 +1238,12 @@ public void testQueryCacheKeyWithLimitSpecPushDownUsingContext() .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, "false")) .build(); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); final CacheStrategy strategy1 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, mapper); final CacheStrategy strategy2 = - new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2); + new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, mapper); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue( @@ -1291,7 +1301,8 @@ public String getFormatString() QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); - CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query); + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query, mapper); Assert.assertTrue( "result level cache on broker server for GroupByStrategyV2 should be enabled", cacheStrategy.isCacheable(query, false, false) diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 7bcb4c2ce038..5fa34d6699d8 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -275,7 +275,7 @@ private class SpecificQueryRunnable this.responseContext = responseContext; this.query = queryPlus.getQuery(); this.toolChest = warehouse.getToolChest(query); - this.strategy = toolChest.getCacheStrategy(query); + this.strategy = toolChest.getCacheStrategy(query, objectMapper); this.dataSourceAnalysis = query.getDataSource().getAnalysis(); this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java index 9bb9f474dd9a..41d4bb4ea639 100644 --- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java @@ -86,7 +86,7 @@ public CachingQueryRunner( public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { Query query = queryPlus.getQuery(); - final CacheStrategy strategy = toolChest.getCacheStrategy(query); + final CacheStrategy strategy = toolChest.getCacheStrategy(query, mapper); final boolean populateCache = canPopulateCache(query, strategy); final boolean useCache = canUseCache(query, strategy); diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 182faba7a09c..0af6ebca3ede 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -73,7 +73,7 @@ public ResultLevelCachingQueryRunner( this.cache = cache; this.cacheConfig = cacheConfig; this.query = query; - this.strategy = queryToolChest.getCacheStrategy(query); + this.strategy = queryToolChest.getCacheStrategy(query, objectMapper); this.populateResultCache = CacheUtil.isPopulateResultCache( query, strategy, diff --git a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java index a4375a61900a..7208ab2fc4ba 100644 --- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java @@ -68,6 +68,7 @@ import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryQueryToolChest; import org.apache.druid.query.topn.TopNResultValue; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.junit.Assert; @@ -90,7 +91,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @RunWith(Parameterized.class) -public class CachingQueryRunnerTest +public class CachingQueryRunnerTest extends InitializedNullHandlingTest { @Parameterized.Parameters(name = "numBackgroundThreads={0}") public static Iterable constructorFeeder() @@ -222,8 +223,8 @@ public void testNullCacheKeyPrefix() Cache cache = EasyMock.mock(Cache.class); EasyMock.replay(cache); CachingQueryRunner queryRunner = makeCachingQueryRunner(null, cache, toolchest, Sequences.empty()); - Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query))); - Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query))); + Assert.assertFalse(queryRunner.canPopulateCache(query, toolchest.getCacheStrategy(query, null))); + Assert.assertFalse(queryRunner.canUseCache(query, toolchest.getCacheStrategy(query, null))); queryRunner.run(QueryPlus.wrap(query)); EasyMock.verifyUnexpectedCalls(cache); } @@ -243,7 +244,7 @@ public void testNullStrategy() QueryToolChest toolchest = EasyMock.mock(QueryToolChest.class); Cache cache = EasyMock.mock(Cache.class); - EasyMock.expect(toolchest.getCacheStrategy(query)).andReturn(null); + EasyMock.expect(toolchest.getCacheStrategy(EasyMock.eq(query), EasyMock.anyObject())).andReturn(null); EasyMock.replay(cache, toolchest); CachingQueryRunner queryRunner = makeCachingQueryRunner(new byte[0], cache, toolchest, Sequences.empty()); Assert.assertFalse(queryRunner.canPopulateCache(query, null)); @@ -339,7 +340,7 @@ public void doMonitor(ServiceEmitter emitter) resultSeq ); - CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( CACHE_ID, SEGMENT_DESCRIPTOR, @@ -383,7 +384,7 @@ private void testUseCache( byte[] cacheKeyPrefix = RandomUtils.nextBytes(10); - CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null); Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( CACHE_ID, SEGMENT_DESCRIPTOR, @@ -399,7 +400,7 @@ private void testUseCache( toolchest, Sequences.empty() ); - Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query))); + Assert.assertTrue(runner.canUseCache(query, toolchest.getCacheStrategy(query, null))); List results = runner.run(QueryPlus.wrap(query)).toList(); Assert.assertEquals(expectedResults.toString(), results.toString()); } From 1f3ff61ffc3595f89f93efcc65f8c24e7a6d2dfb Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 8 Jul 2024 10:13:17 +0530 Subject: [PATCH 12/17] review --- .../apache/druid/query/QueryToolChest.java | 10 +++- .../groupby/GroupByQueryQueryToolChest.java | 30 ++++++++--- .../epinephelinae/RowBasedGrouperHelper.java | 54 +++++++++++-------- .../druid/segment/column/TypeStrategies.java | 30 +++++++++++ .../druid/segment/column/TypeStrategy.java | 2 +- 5 files changed, 92 insertions(+), 34 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index aad83275914a..fa394beec43a 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -252,8 +252,14 @@ public Function makePostComputeManipulatorFn(QueryType q public abstract TypeReference getResultTypeReference(); /** - * + * Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller doesn't supply the object mapper for deserializing + * and converting the cached data to desired type. It's upto the individual implementations to decide the appropriate action in that case. + * It can either throw an exception outright or decide if the query requires the object mapper for proper downstream processing and + * work with the generic java types if not. + *

+ * @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead */ + @Deprecated @Nullable public CacheStrategy getCacheStrategy(QueryType query) { @@ -266,7 +272,7 @@ public CacheStrategy getCacheStrategy(QueryType qu * This is optional. If it returns null, caching is effectively disabled for the query. * * @param query The query whose results might be cached - * @param mapper Object mapper to convert the deserialized generic java objectsto desired types. It can be nullable + * @param mapper Object mapper to convert the deserialized generic java objects to desired types. It can be nullable * to preserve backward compatibility. * @param The type of object that will be stored in the cache * @return A CacheStrategy that can be used to populate and read from the Cache diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index bb0b9ff43ca0..a14dfecbc369 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -78,6 +78,7 @@ import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.StructuredData; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -718,15 +719,28 @@ public ResultRow apply(Object input) final Object dimensionObject = results.next(); final Object dimensionObjectCasted; - // Must convert generic Jackson-deserialized type into the proper type. - if (dimensionSpec.getOutputType().is(ValueType.COMPLEX)) { - DruidException.conditionalDefensive( - mapper != null, - "Cannot deserialize complex dimension from if object mapper is not provided" - ); - dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]); + final ColumnType outputType = dimensionSpec.getOutputType(); + + // Must convert generic Jackson-deserialized type into the proper type. The downstream functions expect the + // dimensions to be of appropriate types for further processing like merging and comparing. + if (outputType.is(ValueType.COMPLEX)) { + // Json columns can interpret generic data objects appropriately, hence they are wrapped as is in StructuredData. + // They don't need to converted them from Object.class to StructuredData.class using object mapper as that is an + // expensive operation that will be wasteful. + if (outputType.equals(ColumnType.NESTED_DATA)) { + dimensionObjectCasted = StructuredData.wrap(dimensionObject); + } else { + DruidException.conditionalDefensive( + mapper != null, + "Cannot deserialize complex dimension from if object mapper is not provided" + ); + dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]); + } } else { - dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(dimensionObject, dimensionSpec.getOutputType()); + dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType( + dimensionObject, + dimensionSpec.getOutputType() + ); } resultRow.set(dimensionStart + dimPos, dimensionObjectCasted); dimPos++; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index a81c520e4f0b..741fc75ae0de 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1384,24 +1384,34 @@ public RowBasedKey deserialize( int dimsReadSoFar = 0; final Object[] objects = new Object[dimsToRead]; + if (includeTimestamp) { + DruidException.conditionalDefensive( + jp.currentToken() != JsonToken.END_ARRAY, + "Unexpected end of array when deserializing timestamp from the spilled files" + ); + objects[dimsReadSoFar] = codec.readValue(jp, Long.class); + + ++dimsReadSoFar; + jp.nextToken(); + } + while (jp.currentToken() != JsonToken.END_ARRAY) { - if (dimsReadSoFar >= dimsToRead) { - throw DruidException.defensive("More dimensions encountered than expected [%d]", dimsToRead); - } - if (includeTimestamp && dimsReadSoFar == 0) { - // Read the timestamp - objects[dimsReadSoFar] = codec.readValue(jp, Long.class); - } else { - DruidException.conditionalDefensive( - dimsReadSoFar - timestampAdjustment < serdeHelpers.length, - "Insufficient serde helpers present" - ); - // Read the dimension - serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz(); - objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); - } + DruidException.conditionalDefensive( + dimsReadSoFar < dimsToRead, + "More dimensions encountered than expected [%d]", + dimsToRead + ); + + DruidException.conditionalDefensive( + dimsReadSoFar - timestampAdjustment < serdeHelpers.length, + "Insufficient serde helpers present" + ); + + // Read the dimension + serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz(); + objects[dimsReadSoFar] = + codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); ++dimsReadSoFar; jp.nextToken(); @@ -1667,11 +1677,7 @@ public GenericRowBasedKeySerdeHelper( dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)), dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)) ); - if (columnType.is(ValueType.COMPLEX)) { - clazz = columnType.getNullableStrategy().getClazz(); - } else { - clazz = Object.class; - } + clazz = columnType.getNullableStrategy().getClazz(); } // Asserts that we don't entertain any complex types without a typename, to prevent intermixing dictionaries of @@ -1795,6 +1801,8 @@ public Object2IntMap getReverseDictionary() @Override public Class getClazz() { + // Jackson deserializes Object[] containing longs to Object[] containing string if Object[].class is returned + // Therefore we are using Object.class return Object.class; } } @@ -1845,7 +1853,7 @@ public Object2IntMap getReverseDictionary() @Override public Class getClazz() { - return Object.class; + return Object[].class; } } @@ -1900,7 +1908,7 @@ public BufferComparator getBufferComparator() @Override public Class getClazz() { - return Object.class; + return String.class; } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java index bae29179b4d5..7ac8def99ec1 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java @@ -299,6 +299,12 @@ public boolean equals(Long a, Long b) { return a.equals(b); } + + @Override + public Class getClazz() + { + return Long.class; + } } /** @@ -368,6 +374,12 @@ public boolean equals(Float a, Float b) { return a.equals(b); } + + @Override + public Class getClazz() + { + return Float.class; + } } /** @@ -438,6 +450,12 @@ public boolean equals(Double a, Double b) { return a.equals(b); } + + @Override + public Class getClazz() + { + return Double.class; + } } /** @@ -519,6 +537,12 @@ public boolean equals(String a, String b) { return a.equals(b); } + + @Override + public Class getClazz() + { + return String.class; + } } /** @@ -664,5 +688,11 @@ public boolean equals(@Nullable Object[] a, @Nullable Object[] b) return false; } } + + @Override + public Class getClazz() + { + return Object[].class; + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index c5cff1a0b2f2..075fceca473f 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -225,6 +225,6 @@ default boolean equals(T a, T b) */ default Class getClazz() { - throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()"); + throw DruidException.defensive("Not implemented. Check groupable() first"); } } From c2724b9064844a236946507bd087c76cf8423660 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 8 Jul 2024 10:52:11 +0530 Subject: [PATCH 13/17] maybe codeql --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 741fc75ae0de..d1e4ffc343b3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1398,7 +1398,7 @@ public RowBasedKey deserialize( while (jp.currentToken() != JsonToken.END_ARRAY) { DruidException.conditionalDefensive( - dimsReadSoFar < dimsToRead, + dimsReadSoFar >= dimsToRead, "More dimensions encountered than expected [%d]", dimsToRead ); From e9eec6add17ba6bb81710cd6416fae23dc6301db Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 9 Jul 2024 00:46:44 +0530 Subject: [PATCH 14/17] Revert "maybe codeql" This reverts commit c2724b9064844a236946507bd087c76cf8423660. --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index d1e4ffc343b3..741fc75ae0de 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1398,7 +1398,7 @@ public RowBasedKey deserialize( while (jp.currentToken() != JsonToken.END_ARRAY) { DruidException.conditionalDefensive( - dimsReadSoFar >= dimsToRead, + dimsReadSoFar < dimsToRead, "More dimensions encountered than expected [%d]", dimsToRead ); From 6f868d104a58bb3f8c337ff0794c5b528383b4ed Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 12 Jul 2024 10:23:11 +0530 Subject: [PATCH 15/17] reviews --- .../druid/jackson/AggregatorsModule.java | 15 ++++++++++----- .../groupby/GroupByQueryQueryToolChest.java | 18 +++++++++++++----- .../epinephelinae/RowBasedGrouperHelper.java | 17 ++++++++--------- .../GroupByQueryQueryToolChestTest.java | 3 +-- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index f7aca511e17d..200e6fcb1394 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -83,6 +83,16 @@ public AggregatorsModule() { super("AggregatorFactories"); + registerComplexMetricsAndSerde(); + + setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); + setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); + + addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE); + } + + public static void registerComplexMetricsAndSerde() + { ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new HyperUniquesSerde()); ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new PreComputedHyperUniquesSerde()); ComplexMetrics.registerSerde( @@ -102,11 +112,6 @@ public AggregatorsModule() SerializablePairLongLongComplexMetricSerde.TYPE_NAME, new SerializablePairLongLongComplexMetricSerde() ); - - setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); - setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); - - addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index a14dfecbc369..088d1160fb54 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -589,12 +589,24 @@ public CacheStrategy getCacheStrategy( @Nullable final ObjectMapper mapper ) { + + for (DimensionSpec dimension : query.getDimensions()) { + if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) { + if (mapper == null) { + throw DruidException.defensive( + "Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided", + dimension.getOutputType().getComplexTypeName() + ); + } + } + } + final Class[] dimensionClasses = createDimensionClasses(query); + return new CacheStrategy() { private static final byte CACHE_STRATEGY_VERSION = 0x1; private final List aggs = query.getAggregatorSpecs(); private final List dims = query.getDimensions(); - private final Class[] dimensionClasses = createDimensionClasses(query); @Override public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment) @@ -730,10 +742,6 @@ public ResultRow apply(Object input) if (outputType.equals(ColumnType.NESTED_DATA)) { dimensionObjectCasted = StructuredData.wrap(dimensionObject); } else { - DruidException.conditionalDefensive( - mapper != null, - "Cannot deserialize complex dimension from if object mapper is not provided" - ); dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]); } } else { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 741fc75ae0de..163977fa3dd8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1397,16 +1397,15 @@ public RowBasedKey deserialize( while (jp.currentToken() != JsonToken.END_ARRAY) { - DruidException.conditionalDefensive( - dimsReadSoFar < dimsToRead, - "More dimensions encountered than expected [%d]", - dimsToRead - ); + // TODO: Remove this + if (dimsReadSoFar >= dimsToRead) { + throw DruidException.defensive("For CodeQL"); + } - DruidException.conditionalDefensive( - dimsReadSoFar - timestampAdjustment < serdeHelpers.length, - "Insufficient serde helpers present" - ); + // TODO: Remove this + if (dimsReadSoFar - timestampAdjustment >= serdeHelpers.length) { + throw DruidException.defensive("For CodeQL"); + } // Read the dimension serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 0c196d95716e..7279ca938bd8 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -98,8 +98,7 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest public static void setUpClass() { NullHandling.initializeForTests(); - //noinspection ResultOfObjectAllocationIgnored - new AggregatorsModule(); + AggregatorsModule.registerComplexMetricsAndSerde(); } @Test From abd8274f573e17836ac470f5b0a8992e6bfc7d35 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 15 Jul 2024 09:34:52 +0530 Subject: [PATCH 16/17] codeql --- .../groupby/epinephelinae/RowBasedGrouperHelper.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 163977fa3dd8..e340052901af 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1397,20 +1397,16 @@ public RowBasedKey deserialize( while (jp.currentToken() != JsonToken.END_ARRAY) { + int serdeHelpersIndex = dimsReadSoFar - timestampAdjustment; // TODO: Remove this - if (dimsReadSoFar >= dimsToRead) { - throw DruidException.defensive("For CodeQL"); - } - - // TODO: Remove this - if (dimsReadSoFar - timestampAdjustment >= serdeHelpers.length) { + if (serdeHelpersIndex >= serdeHelpers.length) { throw DruidException.defensive("For CodeQL"); } // Read the dimension - serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz(); + serdeHelpers[serdeHelpersIndex].getClazz(); objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); + codec.readValue(jp, serdeHelpers[serdeHelpersIndex].getClazz()); ++dimsReadSoFar; jp.nextToken(); From c0594cf817bc3f20144733cccaf9d29cdaf9c6a1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 15 Jul 2024 11:19:55 +0530 Subject: [PATCH 17/17] faster loop --- .../groupby/epinephelinae/RowBasedGrouperHelper.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index e340052901af..da8a0e046230 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -1396,17 +1396,8 @@ public RowBasedKey deserialize( } while (jp.currentToken() != JsonToken.END_ARRAY) { - - int serdeHelpersIndex = dimsReadSoFar - timestampAdjustment; - // TODO: Remove this - if (serdeHelpersIndex >= serdeHelpers.length) { - throw DruidException.defensive("For CodeQL"); - } - - // Read the dimension - serdeHelpers[serdeHelpersIndex].getClazz(); objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[serdeHelpersIndex].getClazz()); + codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); ++dimsReadSoFar; jp.nextToken();