From bd7da78b8561e9551cc817d43e3b7bf782f2446d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 29 May 2024 11:15:27 +0530 Subject: [PATCH 01/10] init --- .../groupby/GroupByQueryQueryToolChest.java | 63 +++++++++++++++++-- .../epinephelinae/RowBasedGrouperHelper.java | 18 ++++-- .../segment/column/NullableTypeStrategy.java | 6 ++ .../ObjectStrategyComplexTypeStrategy.java | 15 ++++- .../druid/segment/column/TypeStrategy.java | 5 ++ .../nested/NestedDataComplexTypeSerde.java | 3 +- .../aggregation/AggregationTestHelper.java | 3 + 7 files changed, 98 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 81ec050ce088..1370b6c75650 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; @@ -73,7 +75,10 @@ import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import org.joda.time.DateTime; import java.io.Closeable; @@ -451,11 +456,11 @@ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final { final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - if (resultAsArray && !queryConfig.isIntermediateResultAsMapCompat()) { - // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, - // and we can save the overhead of making a copy of the ObjectMapper. - return objectMapper; - } +// if (resultAsArray && !queryConfig.isIntermediateResultAsMapCompat()) { +// // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, +// // and we can save the overhead of making a copy of the ObjectMapper. +// return objectMapper; +// } // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. final JsonSerializer serializer = new JsonSerializer() @@ -478,6 +483,8 @@ public void serialize( // Deserializer that can deserialize either array- or map-based rows. final JsonDeserializer deserializer = new JsonDeserializer() { + final Class[] dimensionClasses = createDimensionClasses(); + @Override public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException { @@ -485,9 +492,53 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c final Row row = jp.readValueAs(Row.class); return ResultRow.fromLegacyRow(row, query); } else { - return ResultRow.of(jp.readValueAs(Object[].class)); + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } + + ObjectCodec codec = jp.getCodec(); + + jp.nextToken(); + + int numObjects = 0; + while (jp.currentToken() != JsonToken.END_ARRAY) { + if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { + objectArray[numObjects] = codec.readValue(jp, dimensionClasses[numObjects - query.getResultRowDimensionStart()]); + } else { + objectArray[numObjects] = codec.readValue(jp, Object.class); + } + jp.nextToken(); + ++numObjects; + } + return ResultRow.of(objectArray); } } + + private Class[] createDimensionClasses() + { + final List queryDimensions = query.getDimensions(); + final Class[] classes = new Class[queryDimensions.size()]; + for (int i = 0; i < queryDimensions.size(); ++i) { + final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType(); + if (dimensionOutputType.is(ValueType.COMPLEX)) { + NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + queryDimensions.get(i).getDimension(), + dimensionOutputType + ); + } + classes[i] = dimensionOutputType.getNullableStrategy().complexDimensionType(); + } else { + classes[i] = Object.class; + } + } + return classes; + } + }; class GroupByResultRowModule extends SimpleModule diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 491c28d41427..0ec1a3ce6e9e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -309,7 +309,8 @@ public static Pair, Accumulator combining, includeTimestamp, columnSelectorFactory, - valueTypes + valueTypes, + spillMapper ); final Predicate rowPredicate; @@ -509,14 +510,15 @@ private static ValueExtractFunction makeValueExtractFunction( final boolean combining, final boolean includeTimestamp, final ColumnSelectorFactory columnSelectorFactory, - final List valueTypes + final List valueTypes, + final ObjectMapper objectMapper ) { final TimestampExtractFunction timestampExtractFn = includeTimestamp ? makeTimestampExtractFunction(query, combining) : null; - final Function[] valueConvertFns = makeValueConvertFunctions(valueTypes); + final Function[] valueConvertFns = makeValueConvertFunctions(valueTypes, objectMapper); if (!combining) { final Supplier[] inputRawSuppliers = getValueSuppliersForDimensions( @@ -802,7 +804,8 @@ private static Supplier[] getValueSuppliersForDimensions( @SuppressWarnings("unchecked") private static Function[] makeValueConvertFunctions( - final List valueTypes + final List valueTypes, + final ObjectMapper objectMapper ) { final Function[] functions = new Function[valueTypes.size()]; @@ -810,7 +813,12 @@ private static Function[] makeValueConvertFunctions( // Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because // their types aren't known, so default to String handling. final ColumnType type = valueTypes.get(i) == null ? ColumnType.STRING : valueTypes.get(i); - functions[i] = input -> DimensionHandlerUtils.convertObjectToType(input, type); + if (type.is(ValueType.COMPLEX)) { + Class dimensionClass = type.getNullableStrategy().complexDimensionType(); + functions[i] = input -> objectMapper.convertValue(input, dimensionClass); + } else { + functions[i] = input -> DimensionHandlerUtils.convertObjectToType(input, type); + } } return functions; } diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 044a092a3020..442ba14ded4e 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -21,6 +21,7 @@ import it.unimi.dsi.fastutil.Hash; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; @@ -153,4 +154,9 @@ public boolean equals(@Nullable T a, @Nullable T b) } return b != null && delegate.equals(a, b); } + + public Class complexDimensionType() + { + return delegate.complexDimensionType(); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index 267477e52319..66ef1196b929 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -39,22 +39,25 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy private final TypeSignature typeSignature; @Nullable private final Hash.Strategy hashStrategy; + @Nullable + private final Class complexDimensionType; public ObjectStrategyComplexTypeStrategy(ObjectStrategy objectStrategy, TypeSignature signature) { - this(objectStrategy, signature, null); + this(objectStrategy, signature, null, null); } public ObjectStrategyComplexTypeStrategy( ObjectStrategy objectStrategy, TypeSignature signature, - @Nullable final Hash.Strategy hashStrategy + @Nullable final Hash.Strategy hashStrategy, + @Nullable final Class complexDimensionType ) { this.objectStrategy = objectStrategy; this.typeSignature = signature; this.hashStrategy = hashStrategy; - + this.complexDimensionType = complexDimensionType; } @Override @@ -133,4 +136,10 @@ public boolean equals(T a, T b) } return hashStrategy.equals(a, b); } + + @Override + public Class complexDimensionType() + { + return complexDimensionType; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index 81aa03778362..5885d2c1a006 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -216,4 +216,9 @@ default boolean equals(T a, T b) { throw DruidException.defensive("Not implemented. Check groupable() first"); } + + default Class complexDimensionType() + { + throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()"); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java index 4d1bb347f218..d2e11e7087a1 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java @@ -179,7 +179,8 @@ public boolean equals(Object a, Object b) { return StructuredData.wrap(a).compareTo(StructuredData.wrap(b)) == 0; } - } + }, + Object.class ); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 686e01e86173..526a62c813fb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -159,6 +159,9 @@ public static AggregationTestHelper createGroupByQueryAggregationTestHelper( final Closer closer = Closer.create(); final ObjectMapper mapper = TestHelper.makeJsonMapper(); final TestGroupByBuffers groupByBuffers = closer.register(TestGroupByBuffers.createDefault()); + for (Module mod : jsonModulesToRegister) { + mapper.registerModule(mod); + } final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( mapper, config, From fa76d353db0ab3e70644114db67a3513684455a8 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 31 May 2024 11:40:01 +0530 Subject: [PATCH 02/10] tests, pair groupable --- ...zablePairLongDoubleComplexMetricSerde.java | 28 ++++ ...izablePairLongFloatComplexMetricSerde.java | 28 ++++ ...lizablePairLongLongComplexMetricSerde.java | 28 ++++ ...zablePairLongStringComplexMetricSerde.java | 30 +++- .../groupby/GroupByQueryQueryToolChest.java | 6 - .../segment/column/NullableTypeStrategy.java | 1 - .../druid/sql/calcite/rel/DruidQuery.java | 1 + .../druid/sql/calcite/CalciteQueryTest.java | 132 ++++++++++++++++++ .../sql/calcite/util/TestDataBuilder.java | 7 +- 9 files changed, 251 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java index 7f8befd09275..365533a3ed48 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -106,4 +110,28 @@ public byte[] toBytes(@Nullable SerializablePairLongDouble inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongDouble o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongDouble a, SerializablePairLongDouble b) + { + return a.equals(b); + } + }, + SerializablePairLongDouble.class + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java index d342608a9e15..c055c0bc8e2a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -107,4 +111,28 @@ public byte[] toBytes(@Nullable SerializablePairLongFloat inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongFloat o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongFloat a, SerializablePairLongFloat b) + { + return a.equals(b); + } + }, + SerializablePairLongFloat.class + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java index 7541dfdb2a15..a646b4c09cf3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java @@ -19,9 +19,13 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -106,4 +110,28 @@ public byte[] toBytes(@Nullable SerializablePairLongLong inPair) } }; } + + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongLong o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongLong a, SerializablePairLongLong b) + { + return a.equals(b); + } + }, + SerializablePairLongLong.class + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java index 2cc60843f9e7..d9e5dac7ec46 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java @@ -19,11 +19,15 @@ package org.apache.druid.query.aggregation; +import it.unimi.dsi.fastutil.Hash; import org.apache.druid.collections.SerializablePair; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.ObjectStrategy; import org.apache.druid.segment.serde.ComplexColumnPartSupplier; @@ -130,7 +134,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) } @Override - public ObjectStrategy getObjectStrategy() + public ObjectStrategy getObjectStrategy() { return new ObjectStrategy() { @@ -165,6 +169,30 @@ public byte[] toBytes(SerializablePairLongString val) }; } + @Override + public TypeStrategy getTypeStrategy() + { + return new ObjectStrategyComplexTypeStrategy<>( + getObjectStrategy(), + ColumnType.ofComplex(getTypeName()), + new Hash.Strategy() + { + @Override + public int hashCode(SerializablePairLongString o) + { + return o.hashCode(); + } + + @Override + public boolean equals(SerializablePairLongString a, SerializablePairLongString b) + { + return a.equals(b); + } + }, + SerializablePairLongString.class + ); + } + @Override public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 1370b6c75650..c7a4f99bc309 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -456,12 +456,6 @@ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final { final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); -// if (resultAsArray && !queryConfig.isIntermediateResultAsMapCompat()) { -// // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, -// // and we can save the overhead of making a copy of the ObjectMapper. -// return objectMapper; -// } - // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. final JsonSerializer serializer = new JsonSerializer() { diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 442ba14ded4e..9303a3286630 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -21,7 +21,6 @@ import it.unimi.dsi.fastutil.Hash; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.error.DruidException; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 691e95186598..21f9eff5013f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -90,6 +90,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.DimensionExpression; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 5d5aab111358..d9652f1a49a7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -62,6 +62,10 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory; @@ -751,6 +755,134 @@ public void testEarliestAggregators() ); } + @Test + public void testGroupingOnStringSerializablePairLongString() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT string_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "string_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{39244L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongLong() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT long_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "long_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongLongComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongDouble() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT double_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "double_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + + @Test + public void testGroupingOnStringSerializablePairLongFloat() + { + cannotVectorize(); + testQuery( + "SELECT COUNT(*) FROM (SELECT float_first_added FROM druid.wikipedia_first_last GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(new QueryDataSource( + GroupByQuery.builder() + .setDataSource("wikipedia_first_last") + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec( + "float_first_added", + "d0", + ColumnType.ofComplex(SerializablePairLongFloatComplexMetricSerde.TYPE_NAME) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(ImmutableList.of()) + .setContext(QUERY_CONTEXT_DEFAULT) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .build() + ), + ImmutableList.of(new Object[]{2315L}) + ); + } + @Test public void testLatestToLatestByConversion() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index c914532a3b90..f732771991c4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -51,9 +51,11 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; @@ -655,8 +657,9 @@ public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir) new FloatLastAggregatorFactory("float_last_added", "added", "__time"), new FloatLastAggregatorFactory("float_first_added", "added", "__time"), new DoubleLastAggregatorFactory("double_last_added", "added", "__time"), - new DoubleFirstAggregatorFactory("double_first_added", "added", "__time") - + new DoubleFirstAggregatorFactory("double_first_added", "added", "__time"), + new StringFirstAggregatorFactory("string_first_added", "comment", "__time", 1000), + new StringLastAggregatorFactory("string_last_added", "comment", "__time", 1000) ) .build() ) From 1ae4a949d43e6291452102c3ad216fcd5ecf48c3 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 3 Jun 2024 13:24:58 +0530 Subject: [PATCH 03/10] framework change --- .../druid/query/groupby/GroupingEngine.java | 4 ++ .../GroupByMergingQueryRunner.java | 4 ++ .../epinephelinae/GroupByRowProcessor.java | 2 + .../epinephelinae/RowBasedGrouperHelper.java | 5 ++- .../nested/NestedDataComplexTypeSerde.java | 2 +- .../query/groupby/GroupByQueryRunnerTest.java | 2 +- .../apache/druid/server/QueryStackTests.java | 45 ++++++++++++++++++- .../druid/sql/calcite/rel/DruidQuery.java | 1 - .../druid/quidem/DruidAvaticaTestDriver.java | 4 +- .../sql/calcite/util/SqlTestFramework.java | 14 +++--- 10 files changed, 70 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 6451fb9b943d..6e2748795d5e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -448,6 +448,7 @@ public QueryRunner mergeRunners( groupByResourcesReservationPool, processingConfig.getNumThreads(), processingConfig.intermediateComputeSizeBytes(), + jsonMapper, spillMapper, processingConfig.getTmpDir() ); @@ -600,6 +601,7 @@ public Sequence processSubqueryResult( configSupplier.get(), processingConfig, resource, + jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() @@ -681,6 +683,7 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, + jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() @@ -744,6 +747,7 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, + jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java index 6e2064d5f1f1..2e00f15aae88 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java @@ -100,6 +100,7 @@ public class GroupByMergingQueryRunner implements QueryRunner private final QueryProcessingPool queryProcessingPool; private final QueryWatcher queryWatcher; private final int concurrencyHint; + private final ObjectMapper jsonMapper; private final ObjectMapper spillMapper; private final String processingTmpDir; private final int mergeBufferSize; @@ -113,6 +114,7 @@ public GroupByMergingQueryRunner( GroupByResourcesReservationPool groupByResourcesReservationPool, int concurrencyHint, int mergeBufferSize, + ObjectMapper jsonMapper, ObjectMapper spillMapper, String processingTmpDir ) @@ -124,6 +126,7 @@ public GroupByMergingQueryRunner( this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.groupByResourcesReservationPool = groupByResourcesReservationPool; this.concurrencyHint = concurrencyHint; + this.jsonMapper = jsonMapper; this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; this.mergeBufferSize = mergeBufferSize; @@ -210,6 +213,7 @@ public CloseableGrouperIterator make() combineBufferHolder, concurrencyHint, temporaryStorage, + jsonMapper, spillMapper, queryProcessingPool, // Passed as executor service priority, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index c040c5b64658..d1b7e9c21864 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -91,6 +91,7 @@ public static ResultSupplier process( final GroupByQueryConfig config, final DruidProcessingConfig processingConfig, final GroupByQueryResources resource, + final ObjectMapper jsonMapper, final ObjectMapper spillMapper, final String processingTmpDir, final int mergeBufferSize @@ -127,6 +128,7 @@ public ByteBuffer get() } }, temporaryStorage, + jsonMapper, spillMapper, mergeBufferSize ); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 0ec1a3ce6e9e..a3089064623c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -124,6 +124,7 @@ public static Pair, Accumulator final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, final LimitedTemporaryStorage temporaryStorage, + final ObjectMapper jsonMapper, final ObjectMapper spillMapper, final int mergeBufferSize ) @@ -137,6 +138,7 @@ public static Pair, Accumulator null, SINGLE_THREAD_CONCURRENCY_HINT, temporaryStorage, + jsonMapper, spillMapper, null, UNKNOWN_THREAD_PRIORITY, @@ -186,6 +188,7 @@ public static Pair, Accumulator @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, + final ObjectMapper jsonMapper, final ObjectMapper spillMapper, @Nullable final ListeningExecutorService grouperSorter, final int priority, @@ -310,7 +313,7 @@ public static Pair, Accumulator includeTimestamp, columnSelectorFactory, valueTypes, - spillMapper + jsonMapper ); final Predicate rowPredicate; diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java index d2e11e7087a1..fd42e71b09fb 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java @@ -180,7 +180,7 @@ public boolean equals(Object a, Object b) return StructuredData.wrap(a).compareTo(StructuredData.wrap(b)) == 0; } }, - Object.class + StructuredData.class ); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 080d5d5d4ba3..0aa3940d353d 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -356,8 +356,8 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( configSupplier, bufferPools.getProcessingPool(), groupByResourcesReservationPool, - TestHelper.makeJsonMapper(), mapper, + TestHelper.makeSmileMapper(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 494ef763a783..7d3f58ef6000 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -79,6 +79,7 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentWrangler; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.join.FrameBasedInlineJoinableFactory; import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; @@ -242,13 +243,23 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat final Closer closer, final Supplier minTopNThresholdSupplier ) + { + return createQueryRunnerFactoryConglomerate(closer, minTopNThresholdSupplier, TestHelper.makeJsonMapper()); + } + + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final Supplier minTopNThresholdSupplier, + final ObjectMapper jsonMapper + ) { return createQueryRunnerFactoryConglomerate( closer, getProcessingConfig( DEFAULT_NUM_MERGE_BUFFERS ), - minTopNThresholdSupplier + minTopNThresholdSupplier, + jsonMapper ); } @@ -264,11 +275,41 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat ); } + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final DruidProcessingConfig processingConfig, + final ObjectMapper jsonMapper + ) + { + return createQueryRunnerFactoryConglomerate( + closer, + processingConfig, + () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD, + jsonMapper + ); + } + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( final Closer closer, final DruidProcessingConfig processingConfig, final Supplier minTopNThresholdSupplier ) + { + return createQueryRunnerFactoryConglomerate( + closer, + processingConfig, + minTopNThresholdSupplier, + TestHelper.makeJsonMapper() + ); + } + + + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final DruidProcessingConfig processingConfig, + final Supplier minTopNThresholdSupplier, + final ObjectMapper jsonMapper + ) { final TestBufferPool testBufferPool = TestBufferPool.offHeap(COMPUTE_BUFFER_SIZE, Integer.MAX_VALUE); closer.register(() -> { @@ -281,7 +322,7 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat final GroupByQueryRunnerFactory groupByQueryRunnerFactory = GroupByQueryRunnerTest.makeQueryRunnerFactory( - GroupByQueryRunnerTest.DEFAULT_MAPPER, + jsonMapper, new GroupByQueryConfig() { }, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 21f9eff5013f..691e95186598 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -90,7 +90,6 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.DimensionExpression; diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java index 74888c89607c..123019dad04f 100644 --- a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java +++ b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java @@ -280,9 +280,9 @@ public void configureGuice(DruidInjectorBuilder builder) } @Override - public QueryRunnerFactoryConglomerate createCongolmerate(Builder builder, Closer closer) + public QueryRunnerFactoryConglomerate createCongolmerate(Builder builder, Closer closer, ObjectMapper jsonMapper) { - return delegate.createCongolmerate(builder, closer); + return delegate.createCongolmerate(builder, closer, jsonMapper); } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index f0d6431615db..54deae224852 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -145,7 +145,8 @@ public interface QueryComponentSupplier extends Closeable QueryRunnerFactoryConglomerate createCongolmerate( Builder builder, - Closer closer + Closer closer, + ObjectMapper jsonMapper ); SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( @@ -238,18 +239,21 @@ public void configureGuice(DruidInjectorBuilder builder) @Override public QueryRunnerFactoryConglomerate createCongolmerate( Builder builder, - Closer resourceCloser + Closer resourceCloser, + ObjectMapper jsonMapper ) { if (builder.mergeBufferCount == 0) { return QueryStackTests.createQueryRunnerFactoryConglomerate( resourceCloser, - () -> builder.minTopNThreshold + () -> builder.minTopNThreshold, + jsonMapper ); } else { return QueryStackTests.createQueryRunnerFactoryConglomerate( resourceCloser, - QueryStackTests.getProcessingConfig(builder.mergeBufferCount) + QueryStackTests.getProcessingConfig(builder.mergeBufferCount), + jsonMapper ); } } @@ -542,7 +546,7 @@ public void configure(Binder binder) @LazySingleton public QueryRunnerFactoryConglomerate conglomerate() { - return componentSupplier.createCongolmerate(builder, resourceCloser); + return componentSupplier.createCongolmerate(builder, resourceCloser, queryJsonMapper()); } @Provides From 7aa5e7a39ad898b99b9cc1afd29a1ba15177c0fc Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 3 Jun 2024 22:44:13 +0530 Subject: [PATCH 04/10] tests --- .../druid/query/groupby/GroupByQueryConfig.java | 1 + .../query/groupby/GroupByQueryQueryToolChest.java | 5 ----- .../groupby/GroupByQueryQueryToolChestTest.java | 14 +------------- .../query/groupby/GroupByQueryRunnerTest.java | 3 +-- 4 files changed, 3 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 40dc9715885f..2fb4dfdd4d2a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -282,6 +282,7 @@ public boolean isVectorize() return vectorize; } + @SuppressWarnings("unused") public boolean isIntermediateResultAsMapCompat() { return intermediateResultAsMapCompat; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index c7a4f99bc309..05ffa9eba750 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -33,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.data.input.Row; @@ -108,7 +107,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest queryConfigSupplier, GroupByQueryMetricsFactory queryMetricsFactory, @Merging GroupByResourcesReservationPool groupByResourcesReservationPool ) { this.groupingEngine = groupingEngine; - this.queryConfig = queryConfigSupplier.get(); this.queryMetricsFactory = queryMetricsFactory; this.groupByResourcesReservationPool = groupByResourcesReservationPool; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index d2b599049944..678b32b2fcda 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -658,19 +658,7 @@ public void testResultSerdeIntermediateResultAsMapCompat() throws Exception .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); - final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - null, - () -> new GroupByQueryConfig() - { - @Override - public boolean isIntermediateResultAsMapCompat() - { - return true; - } - }, - null, - null - ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null, null); final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 0aa3940d353d..3613246fef65 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -357,12 +357,11 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( bufferPools.getProcessingPool(), groupByResourcesReservationPool, mapper, - TestHelper.makeSmileMapper(), + mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, - () -> config, DefaultGroupByQueryMetricsFactory.instance(), groupByResourcesReservationPool ); From c7d25fc2dbc1dbd5d81904944a7f9fd8b6861647 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 10 Jun 2024 21:46:09 +0530 Subject: [PATCH 05/10] update benchmarks --- .../druid/benchmark/query/GroupByBenchmark.java | 12 +++++++----- .../benchmark/query/SerializingQueryRunner.java | 17 +++++++++-------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index 217946f4ecd9..346b8e4a7f7b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.ResourceIdPopulatingQueryRunner; import org.apache.druid.timeline.SegmentId; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -433,7 +434,8 @@ public void setup() String queryName = schemaQuery[1]; schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName); - query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); + query = (GroupByQuery) ResourceIdPopulatingQueryRunner.populateResourceId(SCHEMA_QUERY_MAP.get(schemaName) + .get(queryName)); generator = new DataGenerator( schemaInfo.getColumnSchemas(), @@ -762,12 +764,12 @@ public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableInde //noinspection unchecked QueryRunner theRunner = new FinalizeResultsQueryRunner<>( toolChest.mergeResults( - new SerializingQueryRunner<>( - new DefaultObjectMapper(new SmileFactory(), null), + new SerializingQueryRunner( + toolChest.decorateObjectMapper(new DefaultObjectMapper(new SmileFactory(), null), query), ResultRow.class, - toolChest.mergeResults( + (queryPlus, responseContext) -> toolChest.mergeResults( factory.mergeRunners(state.executorService, makeMultiRunners(state)) - ) + ).run(QueryPlus.wrap(ResourceIdPopulatingQueryRunner.populateResourceId(query))) ) ), (QueryToolChest) toolChest diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java index cfdd30af391a..334621ba6007 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SerializingQueryRunner.java @@ -28,21 +28,22 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.groupby.ResultRow; -public class SerializingQueryRunner implements QueryRunner +public class SerializingQueryRunner implements QueryRunner { static { NullHandling.initializeForTests(); } private final ObjectMapper smileMapper; - private final QueryRunner baseRunner; - private final Class clazz; + private final QueryRunner baseRunner; + private final Class clazz; public SerializingQueryRunner( ObjectMapper smileMapper, - Class clazz, - QueryRunner baseRunner + Class clazz, + QueryRunner baseRunner ) { this.smileMapper = smileMapper; @@ -51,8 +52,8 @@ public SerializingQueryRunner( } @Override - public Sequence run( - final QueryPlus queryPlus, + public Sequence run( + final QueryPlus queryPlus, final ResponseContext responseContext ) { @@ -60,7 +61,7 @@ public Sequence run( baseRunner.run(queryPlus, responseContext), input -> { try { - return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input), clazz); + return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input.getArray()), clazz); } catch (JsonProcessingException e) { throw new RuntimeException(e); From 66c177d3780d31a65b201f7db4c9a416ef434484 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jun 2024 11:42:36 +0530 Subject: [PATCH 06/10] comments --- ...lizablePairLongDoubleComplexMetricSerde.java | 3 +-- ...alizablePairLongFloatComplexMetricSerde.java | 3 +-- ...ializablePairLongLongComplexMetricSerde.java | 3 +-- ...lizablePairLongStringComplexMetricSerde.java | 3 +-- .../segment/column/NullableTypeStrategy.java | 2 +- .../ObjectStrategyComplexTypeStrategy.java | 17 ++++++++++------- .../druid/segment/column/TypeStrategy.java | 8 +++++++- .../nested/NestedDataComplexTypeSerde.java | 3 +-- 8 files changed, 23 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java index 365533a3ed48..1d5e81d3ac65 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java @@ -130,8 +130,7 @@ public boolean equals(SerializablePairLongDouble a, SerializablePairLongDouble b { return a.equals(b); } - }, - SerializablePairLongDouble.class + } ); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java index c055c0bc8e2a..1f4f624fef4e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java @@ -131,8 +131,7 @@ public boolean equals(SerializablePairLongFloat a, SerializablePairLongFloat b) { return a.equals(b); } - }, - SerializablePairLongFloat.class + } ); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java index a646b4c09cf3..7b8a60040157 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java @@ -130,8 +130,7 @@ public boolean equals(SerializablePairLongLong a, SerializablePairLongLong b) { return a.equals(b); } - }, - SerializablePairLongLong.class + } ); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java index d9e5dac7ec46..625d597ba8c7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java @@ -188,8 +188,7 @@ public boolean equals(SerializablePairLongString a, SerializablePairLongString b { return a.equals(b); } - }, - SerializablePairLongString.class + } ); } diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 9303a3286630..3d089a768271 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -156,6 +156,6 @@ public boolean equals(@Nullable T a, @Nullable T b) public Class complexDimensionType() { - return delegate.complexDimensionType(); + return delegate.getClazz(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index 66ef1196b929..b274e55282ea 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -40,24 +40,24 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy @Nullable private final Hash.Strategy hashStrategy; @Nullable - private final Class complexDimensionType; + private final Class clazz; public ObjectStrategyComplexTypeStrategy(ObjectStrategy objectStrategy, TypeSignature signature) { - this(objectStrategy, signature, null, null); + this(objectStrategy, signature, null); } public ObjectStrategyComplexTypeStrategy( ObjectStrategy objectStrategy, TypeSignature signature, - @Nullable final Hash.Strategy hashStrategy, - @Nullable final Class complexDimensionType + @Nullable final Hash.Strategy hashStrategy ) { this.objectStrategy = objectStrategy; this.typeSignature = signature; this.hashStrategy = hashStrategy; - this.complexDimensionType = complexDimensionType; + //noinspection VariableNotUsedInsideIf + this.clazz = hashStrategy == null ? null : objectStrategy.getClazz(); } @Override @@ -138,8 +138,11 @@ public boolean equals(T a, T b) } @Override - public Class complexDimensionType() + public Class getClazz() { - return complexDimensionType; + if (clazz == null) { + throw DruidException.defensive("hashStrategy not provided"); + } + return clazz; } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index 5885d2c1a006..c5cff1a0b2f2 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -193,6 +193,9 @@ default T fromBytes(byte[] value) * c. {@link #compare(Object, Object)} must be consistent with equals. Apart from abiding by the definition of * {@link Comparator#compare}, it must not return 0 for two objects that are not equals, and converse must also hold, * i.e. if the value returned by compare is not zero, then the arguments must not be equal. + *

+ * d. {@link #getClazz()} should return the Java class for the dimension represented by the type. This will be used by the + * mapper to deserialize the object during tasks like broker-historical interaction and spilling to the disk. */ default boolean groupable() { @@ -217,7 +220,10 @@ default boolean equals(T a, T b) throw DruidException.defensive("Not implemented. Check groupable() first"); } - default Class complexDimensionType() + /** + * @see #groupable() + */ + default Class getClazz() { throw DruidException.defensive("Not implemented. It is only implemented for complex dimensions which are groupable()"); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java index fd42e71b09fb..4d1bb347f218 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java @@ -179,8 +179,7 @@ public boolean equals(Object a, Object b) { return StructuredData.wrap(a).compareTo(StructuredData.wrap(b)) == 0; } - }, - StructuredData.class + } ); } From f8ce54719409442a9bab65b7ebea690f80f31541 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jun 2024 11:48:37 +0530 Subject: [PATCH 07/10] add javadoc for the jsonMapper --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index a3089064623c..7540849a4ef2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -171,6 +171,8 @@ public static Pair, Accumulator * @param combineBufferHolder holder of combine buffers. Unused if concurrencyHint = -1, and may be null in that case * @param concurrencyHint -1 for single-threaded Grouper, >=1 for concurrent Grouper * @param temporaryStorage temporary storage used for spilling from the Grouper + * @param jsonMapper object mapper used to convert the complex dimensions from generic java objects to the + * specific class of the complex object * @param spillMapper object mapper used for spilling from the Grouper * @param grouperSorter executor service used for parallel combining. Unused if concurrencyHint = -1, and may * be null in that case From b3a51f59eb952d505b073d32ea54708a311bc492 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jun 2024 22:58:59 +0530 Subject: [PATCH 08/10] remove extra deserialization --- .../groupby/GroupByQueryQueryToolChest.java | 2 +- .../druid/query/groupby/GroupingEngine.java | 4 --- .../GroupByMergingQueryRunner.java | 4 --- .../epinephelinae/GroupByRowProcessor.java | 2 -- .../epinephelinae/RowBasedGrouperHelper.java | 25 ++++--------------- .../segment/column/NullableTypeStrategy.java | 2 +- 6 files changed, 7 insertions(+), 32 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 05ffa9eba750..bd610d8c62cb 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -520,7 +520,7 @@ private Class[] createDimensionClasses() dimensionOutputType ); } - classes[i] = dimensionOutputType.getNullableStrategy().complexDimensionType(); + classes[i] = dimensionOutputType.getNullableStrategy().getClazz(); } else { classes[i] = Object.class; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 6e2748795d5e..6451fb9b943d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -448,7 +448,6 @@ public QueryRunner mergeRunners( groupByResourcesReservationPool, processingConfig.getNumThreads(), processingConfig.intermediateComputeSizeBytes(), - jsonMapper, spillMapper, processingConfig.getTmpDir() ); @@ -601,7 +600,6 @@ public Sequence processSubqueryResult( configSupplier.get(), processingConfig, resource, - jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() @@ -683,7 +681,6 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, - jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() @@ -747,7 +744,6 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, - jsonMapper, spillMapper, processingConfig.getTmpDir(), processingConfig.intermediateComputeSizeBytes() diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java index 2e00f15aae88..6e2064d5f1f1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java @@ -100,7 +100,6 @@ public class GroupByMergingQueryRunner implements QueryRunner private final QueryProcessingPool queryProcessingPool; private final QueryWatcher queryWatcher; private final int concurrencyHint; - private final ObjectMapper jsonMapper; private final ObjectMapper spillMapper; private final String processingTmpDir; private final int mergeBufferSize; @@ -114,7 +113,6 @@ public GroupByMergingQueryRunner( GroupByResourcesReservationPool groupByResourcesReservationPool, int concurrencyHint, int mergeBufferSize, - ObjectMapper jsonMapper, ObjectMapper spillMapper, String processingTmpDir ) @@ -126,7 +124,6 @@ public GroupByMergingQueryRunner( this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.groupByResourcesReservationPool = groupByResourcesReservationPool; this.concurrencyHint = concurrencyHint; - this.jsonMapper = jsonMapper; this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; this.mergeBufferSize = mergeBufferSize; @@ -213,7 +210,6 @@ public CloseableGrouperIterator make() combineBufferHolder, concurrencyHint, temporaryStorage, - jsonMapper, spillMapper, queryProcessingPool, // Passed as executor service priority, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index d1b7e9c21864..c040c5b64658 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -91,7 +91,6 @@ public static ResultSupplier process( final GroupByQueryConfig config, final DruidProcessingConfig processingConfig, final GroupByQueryResources resource, - final ObjectMapper jsonMapper, final ObjectMapper spillMapper, final String processingTmpDir, final int mergeBufferSize @@ -128,7 +127,6 @@ public ByteBuffer get() } }, temporaryStorage, - jsonMapper, spillMapper, mergeBufferSize ); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 7540849a4ef2..1be3f991e17a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -124,7 +124,6 @@ public static Pair, Accumulator final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, final LimitedTemporaryStorage temporaryStorage, - final ObjectMapper jsonMapper, final ObjectMapper spillMapper, final int mergeBufferSize ) @@ -138,7 +137,6 @@ public static Pair, Accumulator null, SINGLE_THREAD_CONCURRENCY_HINT, temporaryStorage, - jsonMapper, spillMapper, null, UNKNOWN_THREAD_PRIORITY, @@ -171,8 +169,6 @@ public static Pair, Accumulator * @param combineBufferHolder holder of combine buffers. Unused if concurrencyHint = -1, and may be null in that case * @param concurrencyHint -1 for single-threaded Grouper, >=1 for concurrent Grouper * @param temporaryStorage temporary storage used for spilling from the Grouper - * @param jsonMapper object mapper used to convert the complex dimensions from generic java objects to the - * specific class of the complex object * @param spillMapper object mapper used for spilling from the Grouper * @param grouperSorter executor service used for parallel combining. Unused if concurrencyHint = -1, and may * be null in that case @@ -190,7 +186,6 @@ public static Pair, Accumulator @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, - final ObjectMapper jsonMapper, final ObjectMapper spillMapper, @Nullable final ListeningExecutorService grouperSorter, final int priority, @@ -314,8 +309,7 @@ public static Pair, Accumulator combining, includeTimestamp, columnSelectorFactory, - valueTypes, - jsonMapper + valueTypes ); final Predicate rowPredicate; @@ -515,15 +509,14 @@ private static ValueExtractFunction makeValueExtractFunction( final boolean combining, final boolean includeTimestamp, final ColumnSelectorFactory columnSelectorFactory, - final List valueTypes, - final ObjectMapper objectMapper + final List valueTypes ) { final TimestampExtractFunction timestampExtractFn = includeTimestamp ? makeTimestampExtractFunction(query, combining) : null; - final Function[] valueConvertFns = makeValueConvertFunctions(valueTypes, objectMapper); + final Function[] valueConvertFns = makeValueConvertFunctions(valueTypes); if (!combining) { final Supplier[] inputRawSuppliers = getValueSuppliersForDimensions( @@ -808,22 +801,14 @@ private static Supplier[] getValueSuppliersForDimensions( } @SuppressWarnings("unchecked") - private static Function[] makeValueConvertFunctions( - final List valueTypes, - final ObjectMapper objectMapper - ) + private static Function[] makeValueConvertFunctions(final List valueTypes) { final Function[] functions = new Function[valueTypes.size()]; for (int i = 0; i < functions.length; i++) { // Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because // their types aren't known, so default to String handling. final ColumnType type = valueTypes.get(i) == null ? ColumnType.STRING : valueTypes.get(i); - if (type.is(ValueType.COMPLEX)) { - Class dimensionClass = type.getNullableStrategy().complexDimensionType(); - functions[i] = input -> objectMapper.convertValue(input, dimensionClass); - } else { - functions[i] = input -> DimensionHandlerUtils.convertObjectToType(input, type); - } + functions[i] = input -> DimensionHandlerUtils.convertObjectToType(input, type); } return functions; } diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 3d089a768271..88642a964e8c 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -154,7 +154,7 @@ public boolean equals(@Nullable T a, @Nullable T b) return b != null && delegate.equals(a, b); } - public Class complexDimensionType() + public Class getClazz() { return delegate.getClazz(); } From 3fc82aeba5fc2ed38ee39e893fc40836cec88030 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 14 Jun 2024 00:55:09 +0530 Subject: [PATCH 09/10] add special serde for map based result rows --- .../groupby/GroupByQueryQueryToolChest.java | 25 +++++- .../GroupByQueryQueryToolChestTest.java | 82 +++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index bd610d8c62cb..47064fefbe60 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -473,13 +473,34 @@ public void serialize( final JsonDeserializer deserializer = new JsonDeserializer() { final Class[] dimensionClasses = createDimensionClasses(); + boolean containsComplexDimensions = query.getDimensions() + .stream() + .anyMatch( + dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) + ); @Override public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException { if (jp.isExpectedStartObjectToken()) { final Row row = jp.readValueAs(Row.class); - return ResultRow.fromLegacyRow(row, query); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + if (containsComplexDimensions) { + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + dimensionClasses[i] + ) + ); + } + } + } + return resultRow; } else { Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; @@ -520,7 +541,7 @@ private Class[] createDimensionClasses() dimensionOutputType ); } - classes[i] = dimensionOutputType.getNullableStrategy().getClazz(); + classes[i] = nullableTypeStrategy.getClazz(); } else { classes[i] = Object.class; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 678b32b2fcda..f43bbce9d978 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.last.LongLastAggregatorFactory; @@ -645,6 +646,87 @@ public void testResultSerde() throws Exception ); } + @Test + public void testResultSerdeWithComplexDimension() throws Exception + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(ImmutableList.of( + DefaultDimensionSpec.of("test"), + new DefaultDimensionSpec( + "test2", + "test2", + ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME) + ) + )) + .setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)) + .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator("post", 10))) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null); + + final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( + objectMapper, + query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, true)) + ); + final ObjectMapper mapsObjectMapper = toolChest.decorateObjectMapper( + objectMapper, + query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false)) + ); + + final Object[] rowObjects = { + DateTimes.of("2000").getMillis(), + "foo", + new SerializablePairLongString(1L, "test"), + 100, + 10.0 + }; + final ResultRow resultRow = ResultRow.of(rowObjects); + + // There are no tests with the standard mapper since it cannot convert the generic deserialized type for Pair class + // into the Pair class + + Assert.assertEquals( + "array mapper reads arrays", + resultRow, + arraysObjectMapper.readValue( + arraysObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + + Assert.assertEquals( + "array mapper reads arrays (2)", + resultRow, + arraysObjectMapper.readValue( + StringUtils.format("[%s, \"foo\", {\"lhs\":1,\"rhs\":\"test\"}, 100, 10.0]", DateTimes.of("2000").getMillis()), + ResultRow.class + ) + ); + + Assert.assertEquals( + "map mapper reads arrays", + resultRow, + mapsObjectMapper.readValue( + arraysObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + + Assert.assertEquals( + "map mapper reads maps", + resultRow, + mapsObjectMapper.readValue( + mapsObjectMapper.writeValueAsBytes(resultRow), + ResultRow.class + ) + ); + } + @Test public void testResultSerdeIntermediateResultAsMapCompat() throws Exception { From 7d70e534838ccb1f06ff746b2aaa8e96dea664b2 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 14 Jun 2024 01:01:18 +0530 Subject: [PATCH 10/10] revert unnecessary change --- .../query/groupby/epinephelinae/RowBasedGrouperHelper.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 1be3f991e17a..491c28d41427 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -801,7 +801,9 @@ private static Supplier[] getValueSuppliersForDimensions( } @SuppressWarnings("unchecked") - private static Function[] makeValueConvertFunctions(final List valueTypes) + private static Function[] makeValueConvertFunctions( + final List valueTypes + ) { final Function[] functions = new Function[valueTypes.size()]; for (int i = 0; i < functions.length; i++) {