From fb15cf5102f83de1ec28057239b0b47e71c3c789 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 16 Jul 2024 11:52:21 +0530 Subject: [PATCH 01/10] faster serde --- .../java/util/common/jackson/JacksonUtils.java | 11 +++++++++++ .../query/groupby/GroupByQueryQueryToolChest.java | 15 +++++++++++---- .../epinephelinae/RowBasedGrouperHelper.java | 12 ++++++++---- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index bfb1dc79d788..937580acdb81 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -20,8 +20,10 @@ package org.apache.druid.java.util.common.jackson; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonSerializer; @@ -121,6 +123,15 @@ public static void writeObjectUsingSerializerProvider( serializer.serialize(o, jsonGenerator, serializers); } } + + public static T readObjectUsingDeserializationContext( + final JsonParser jp, + final DeserializationContext deserializationContext, + final Class clazz + ) throws IOException + { + return deserializationContext.readValue(jp, clazz); + } /** * Convert the given object to an array of bytes. Use when the object is diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index d69e09c9ff0b..2478289a4a18 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -509,16 +509,23 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); } - ObjectCodec codec = jp.getCodec(); - jp.nextToken(); int numObjects = 0; while (jp.currentToken() != JsonToken.END_ARRAY) { if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = codec.readValue(jp, dimensionClasses[numObjects - query.getResultRowDimensionStart()]); + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + dimensionClasses[numObjects - query.getResultRowDimensionStart()] + ); + } else { - objectArray[numObjects] = codec.readValue(jp, Object.class); + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + Object.class + ); } jp.nextToken(); ++numObjects; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index da8a0e046230..cc51c8a7cd70 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.granularity.AllGranularity; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.DimensionComparisonUtils; @@ -1378,7 +1379,6 @@ public RowBasedKey deserialize( } jp.nextToken(); - final ObjectCodec codec = jp.getCodec(); final int timestampAdjustment = includeTimestamp ? 1 : 0; final int dimsToRead = timestampAdjustment + serdeHelpers.length; int dimsReadSoFar = 0; @@ -1389,15 +1389,19 @@ public RowBasedKey deserialize( jp.currentToken() != JsonToken.END_ARRAY, "Unexpected end of array when deserializing timestamp from the spilled files" ); - objects[dimsReadSoFar] = codec.readValue(jp, Long.class); + objects[dimsReadSoFar] = JacksonUtils.readObjectUsingDeserializationContext(jp, deserializationContext, Long.class); ++dimsReadSoFar; jp.nextToken(); } while (jp.currentToken() != JsonToken.END_ARRAY) { - objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); + objects[dimsReadSoFar] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + deserializationContext, + serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz() + ); + ++dimsReadSoFar; jp.nextToken(); From 2f54dd9166eb4860e3315defb38227b2a33a4359 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 16 Jul 2024 19:52:35 +0530 Subject: [PATCH 02/10] remove backward compat --- .../groupby/GroupByQueryQueryToolChest.java | 65 +++++++------------ 1 file changed, 22 insertions(+), 43 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 2478289a4a18..f598193e12cc 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -483,55 +483,34 @@ public void serialize( @Override public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException { - if (jp.isExpectedStartObjectToken()) { - final Row row = jp.readValueAs(Row.class); - final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); - if (containsComplexDimensions) { - final List queryDimensions = query.getDimensions(); - for (int i = 0; i < queryDimensions.size(); ++i) { - if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { - final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; - resultRow.set( - dimensionIndexInResultRow, - objectMapper.convertValue( - resultRow.get(dimensionIndexInResultRow), - dimensionClasses[i] - ) - ); - } - } - } - return resultRow; - } else { - Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; - if (!jp.isExpectedStartArrayToken()) { - throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); - } + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } - jp.nextToken(); + jp.nextToken(); - int numObjects = 0; - while (jp.currentToken() != JsonToken.END_ARRAY) { - if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - dimensionClasses[numObjects - query.getResultRowDimensionStart()] - ); + int numObjects = 0; + while (jp.currentToken() != JsonToken.END_ARRAY) { + if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + dimensionClasses[numObjects - query.getResultRowDimensionStart()] + ); - } else { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - Object.class - ); - } - jp.nextToken(); - ++numObjects; + } else { + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + Object.class + ); } - return ResultRow.of(objectArray); + jp.nextToken(); + ++numObjects; } + return ResultRow.of(objectArray); } }; From 6affc6138cece13b9d1fd69a7fd0a56ea1d3a9ca Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 16 Jul 2024 23:14:09 +0530 Subject: [PATCH 03/10] Revert "remove backward compat" This reverts commit 2f54dd9166eb4860e3315defb38227b2a33a4359. --- .../groupby/GroupByQueryQueryToolChest.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index f598193e12cc..2478289a4a18 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -483,34 +483,55 @@ public void serialize( @Override public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException { - Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + if (containsComplexDimensions) { + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + dimensionClasses[i] + ) + ); + } + } + } + return resultRow; + } else { + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; - if (!jp.isExpectedStartArrayToken()) { - throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); - } + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } - jp.nextToken(); + jp.nextToken(); - int numObjects = 0; - while (jp.currentToken() != JsonToken.END_ARRAY) { - if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - dimensionClasses[numObjects - query.getResultRowDimensionStart()] - ); + int numObjects = 0; + while (jp.currentToken() != JsonToken.END_ARRAY) { + if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + dimensionClasses[numObjects - query.getResultRowDimensionStart()] + ); - } else { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - Object.class - ); + } else { + objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + ctxt, + Object.class + ); + } + jp.nextToken(); + ++numObjects; } - jp.nextToken(); - ++numObjects; + return ResultRow.of(objectArray); } - return ResultRow.of(objectArray); } }; From b6ede34d7bb6a00c9b015371ea5a039681374669 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 17 Jul 2024 09:57:42 +0530 Subject: [PATCH 04/10] more changes --- .../groupby/GroupByQueryQueryToolChest.java | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 2478289a4a18..822760b0a465 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -25,11 +25,14 @@ import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -474,6 +477,8 @@ public void serialize( final JsonDeserializer deserializer = new JsonDeserializer() { final Class[] dimensionClasses = createDimensionClasses(query); + final JsonDeserializer[] rootValueDeserializers = createRootValueDeserializers(dimensionClasses); + final JsonDeserializer= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - dimensionClasses[numObjects - query.getResultRowDimensionStart()] - ); - + objectArray[numObjects] = rootValueDeserializers[numObjects - query.getResultRowDimensionStart()].deserialize(jp, ctxt); } else { objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( jp, @@ -908,4 +908,19 @@ private static Class[] createDimensionClasses(final GroupByQuery query) } return classes; } + + private static Class createRootValueDeserializers(final Class[] classes, final JsonParser parser, DeserializationContext ctxt) + throws JsonMappingException + { + final TypeFactory typeFactory = TypeFactory.defaultInstance(); + JsonDeserializer[] rootValueDeserializers = new JsonDeserializer[classes.length]; + + for (int i = 0; i < classes.length; ++i) { + Class clazz = classes[i]; + JavaType type = typeFactory.constructType(clazz); + rootValueDeserializers[i] = ctxt.findRootValueDeserializer(type); + } + + return rootValueDeserializers; + } } From cf554f0fa8aa7bcd54591976dba021e28f784098 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 22 Jul 2024 20:07:26 +0530 Subject: [PATCH 05/10] add benchmark, checkstyle wont pass --- .../GroupByDeserializationBenchmark.java | 176 +++++++++++++++ .../groupby/GroupByQueryQueryToolChest.java | 212 +++++++++--------- .../GroupByQueryQueryToolChestTest.java | 14 +- .../query/groupby/GroupByQueryRunnerTest.java | 1 + 4 files changed, 291 insertions(+), 112 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java new file mode 100644 index 000000000000..e6f1a26c863c --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java @@ -0,0 +1,176 @@ +package org.apache.druid.benchmark; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.jackson.AggregatorsModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +public class GroupByDeserializationBenchmark +{ + + static { + NullHandling.initializeForTests(); + NestedDataModule.registerHandlersAndSerde(); + AggregatorsModule.registerComplexMetricsAndSerde(); + } + +// @Param({"100", "1000"}) + @Param({"100"}) + private int numDimensions; + +// @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) + @Param({"0", "0.5", "1.0"}) + private double primitiveToComplexDimensionRatio; + + @Param({"json", "serializablePairLongString"}) + private String complexDimensionType; + + @Param({"true", "false"}) + private boolean backwardCompatibility; + + private GroupByQuery sqlQuery; + private String serializedRow; + private GroupByQueryQueryToolChest groupByQueryQueryToolChest; + private ObjectMapper decoratedMapper; + + @Setup(Level.Trial) + public void setup() throws JsonProcessingException + { + final ObjectMapper undecoratedMapper = TestHelper.makeJsonMapper(); + undecoratedMapper.registerModules(NestedDataModule.getJacksonModulesList()); + undecoratedMapper.registerModule(new AggregatorsModule()); + final Pair sqlQueryAndResultRow = sqlQueryAndResultRow( + numDimensions, + primitiveToComplexDimensionRatio, + complexDimensionType, + undecoratedMapper + ); + sqlQuery = sqlQueryAndResultRow.lhs; + serializedRow = sqlQueryAndResultRow.rhs; + + groupByQueryQueryToolChest = new GroupByQueryQueryToolChest( + null, + () -> new GroupByQueryConfig() + { + @Override + public boolean isIntermediateResultAsMapCompat() + { + return backwardCompatibility; + } + }, + null, + null + ); + + decoratedMapper = groupByQueryQueryToolChest.decorateObjectMapper(undecoratedMapper, sqlQuery); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void deserializeResultRows(Blackhole blackhole) throws JsonProcessingException + { + blackhole.consume(decoratedMapper.readValue(serializedRow, ResultRow.class)); + } + + private static Pair sqlQueryAndResultRow( + final int numDimensions, + final double primitiveToComplexDimensionRatio, + final String complexDimensionType, + final ObjectMapper mapper + ) throws JsonProcessingException + { + final int numPrimitiveDimensions = (int) Math.floor(primitiveToComplexDimensionRatio * numDimensions); + final int numComplexDimensions = numDimensions - numPrimitiveDimensions; + + final List dimensions = new ArrayList<>(); + final List rowList = new ArrayList<>(); + + // Add timestamp + rowList.add(DateTimes.of("2000").getMillis()); + + for (int i = 0; i < numPrimitiveDimensions; ++i) { + dimensions.add( + new DefaultDimensionSpec( + StringUtils.format("primitive%d", i), + StringUtils.format("primitive%d", i), + ColumnType.STRING + ) + ); + rowList.add("foo"); + } + + for (int i = 0; i < numComplexDimensions; ++i) { + dimensions.add( + new DefaultDimensionSpec( + StringUtils.format("complex%d", i), + StringUtils.format("complex%d", i), + ColumnType.ofComplex(complexDimensionType) + ) + ); + + // Serialized version of this object is a valid value for both json and long-string pair dimensions + rowList.add(new SerializablePairLongString(1L, "test")); + } + + // Add aggregator + rowList.add(100); + + // Add post aggregator + rowList.add(10.0); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setDimensions(dimensions) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator( + "post", + 10 + ))) + .setContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, true)) + .setGranularity(Granularities.DAY) + .build(); + + return Pair.of(query, mapper.writeValueAsString(rowList)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 822760b0a465..b09c8182d046 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -22,20 +22,17 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.data.input.Row; @@ -111,6 +108,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest queryConfigSupplier, GroupByQueryMetricsFactory queryMetricsFactory, @Merging GroupByResourcesReservationPool groupByResourcesReservationPool ) { this.groupingEngine = groupingEngine; + this.queryConfig = queryConfigSupplier.get(); this.queryMetricsFactory = queryMetricsFactory; this.groupByResourcesReservationPool = groupByResourcesReservationPool; } @@ -455,98 +456,102 @@ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final { final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. - final JsonSerializer serializer = new JsonSerializer() - { - @Override - public void serialize( - final ResultRow resultRow, - final JsonGenerator jg, - final SerializerProvider serializers - ) throws IOException - { - if (resultAsArray) { - JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.getArray()); - } else { - JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.toMapBasedRow(query)); - } - } - }; - - // Deserializer that can deserialize either array- or map-based rows. - final JsonDeserializer deserializer = new JsonDeserializer() - { - final Class[] dimensionClasses = createDimensionClasses(query); - final JsonDeserializer[] rootValueDeserializers = createRootValueDeserializers(dimensionClasses); - final JsonDeserializer dimensionSpec.getOutputType().is(ValueType.COMPLEX) - ); - - @Override - public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException - { - if (jp.isExpectedStartObjectToken()) { - final Row row = jp.readValueAs(Row.class); - final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); - if (containsComplexDimensions) { - final List queryDimensions = query.getDimensions(); - for (int i = 0; i < queryDimensions.size(); ++i) { - if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { - final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; - resultRow.set( - dimensionIndexInResultRow, - objectMapper.convertValue( - resultRow.get(dimensionIndexInResultRow), - dimensionClasses[i] - ) - ); - } - } - } - return resultRow; - } else { - Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; - - if (!jp.isExpectedStartArrayToken()) { - throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); - } - - jp.nextToken(); - - int numObjects = 0; - while (jp.currentToken() != JsonToken.END_ARRAY) { - if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = rootValueDeserializers[numObjects - query.getResultRowDimensionStart()].deserialize(jp, ctxt); - } else { - objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( - jp, - ctxt, - Object.class - ); - } - jp.nextToken(); - ++numObjects; - } - return ResultRow.of(objectArray); - } - } - }; - - class GroupByResultRowModule extends SimpleModule - { - private GroupByResultRowModule() - { - addSerializer(ResultRow.class, serializer); - addDeserializer(ResultRow.class, deserializer); - } - } - - final ObjectMapper newObjectMapper = objectMapper.copy(); - newObjectMapper.registerModule(new GroupByResultRowModule()); - return newObjectMapper; + return objectMapper; + +// // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. +// final JsonSerializer serializer = new JsonSerializer() +// { +// @Override +// public void serialize( +// final ResultRow resultRow, +// final JsonGenerator jg, +// final SerializerProvider serializers +// ) throws IOException +// { +// if (resultAsArray) { +// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.getArray()); +// } else { +// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.toMapBasedRow(query)); +// } +// } +// }; +// +// // Deserializer that can deserialize either array- or map-based rows. +// final JsonDeserializer deserializer = new JsonDeserializer() +// { +// final Class[] dimensionClasses = createDimensionClasses(query); +// boolean containsComplexDimensions = query.getDimensions() +// .stream() +// .anyMatch( +// dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) +// ); +// +// @Override +// public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException +// { +// if (jp.isExpectedStartObjectToken()) { +// final Row row = jp.readValueAs(Row.class); +// final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); +// if (containsComplexDimensions) { +// final List queryDimensions = query.getDimensions(); +// for (int i = 0; i < queryDimensions.size(); ++i) { +// if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { +// final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; +// resultRow.set( +// dimensionIndexInResultRow, +// objectMapper.convertValue( +// resultRow.get(dimensionIndexInResultRow), +// dimensionClasses[i] +// ) +// ); +// } +// } +// } +// return resultRow; +// } else { +// Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; +// +// if (!jp.isExpectedStartArrayToken()) { +// throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); +// } +// +// jp.nextToken(); +// +// int numObjects = 0; +// while (jp.currentToken() != JsonToken.END_ARRAY) { +// if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { +// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( +// jp, +// ctxt, +// dimensionClasses[numObjects - query.getResultRowDimensionStart()] +// ); +// } else { +// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( +// jp, +// ctxt, +// Object.class +// ); +// } +// jp.nextToken(); +// ++numObjects; +// } +// return ResultRow.of(objectArray); +// } +// } +// }; +// +// class GroupByResultRowModule extends SimpleModule +// { +// private GroupByResultRowModule() +// { +// addSerializer(ResultRow.class, serializer); +// addDeserializer(ResultRow.class, deserializer); +// } +// } +// +// final ObjectMapper newObjectMapper = objectMapper.copy(); +// newObjectMapper.registerModule(new GroupByResultRowModule()); +// return newObjectMapper; } @Override @@ -908,19 +913,4 @@ private static Class[] createDimensionClasses(final GroupByQuery query) } return classes; } - - private static Class createRootValueDeserializers(final Class[] classes, final JsonParser parser, DeserializationContext ctxt) - throws JsonMappingException - { - final TypeFactory typeFactory = TypeFactory.defaultInstance(); - JsonDeserializer[] rootValueDeserializers = new JsonDeserializer[classes.length]; - - for (int i = 0; i < classes.length; ++i) { - Class clazz = classes[i]; - JavaType type = typeFactory.constructType(clazz); - rootValueDeserializers[i] = ctxt.findRootValueDeserializer(type); - } - - return rootValueDeserializers; - } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 7279ca938bd8..d9aefd5f55e2 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -792,7 +792,19 @@ public void testResultSerdeIntermediateResultAsMapCompat() throws Exception .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); - final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null, null); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( + null, + () -> new GroupByQueryConfig() + { + @Override + public boolean isIntermediateResultAsMapCompat() + { + return true; + } + }, + null, + null + ); final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index a5dbb49bca51..475848edbdc7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -363,6 +363,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, + () -> config, DefaultGroupByQueryMetricsFactory.instance(), groupByResourcesReservationPool ); From 8512321db655f0d6a6d5da01839c13afdcc25c73 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 24 Jul 2024 02:02:53 +0530 Subject: [PATCH 06/10] changes --- .../GroupByDeserializationBenchmark.java | 25 +- .../util/common/jackson/JacksonUtils.java | 11 +- .../query/groupby/GroupByQueryConfig.java | 1 - .../groupby/GroupByQueryQueryToolChest.java | 110 +------- .../ResultRowObjectMapperDecoratorUtil.java | 237 ++++++++++++++++++ .../epinephelinae/RowBasedGrouperHelper.java | 1 - 6 files changed, 269 insertions(+), 116 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java index e6f1a26c863c..c3ebbe5122f9 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.benchmark; import com.fasterxml.jackson.core.JsonProcessingException; @@ -53,12 +72,10 @@ public class GroupByDeserializationBenchmark AggregatorsModule.registerComplexMetricsAndSerde(); } -// @Param({"100", "1000"}) - @Param({"100"}) + @Param({"100", "1000"}) private int numDimensions; -// @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) - @Param({"0", "0.5", "1.0"}) + @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) private double primitiveToComplexDimensionRatio; @Param({"json", "serializablePairLongString"}) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index 937580acdb81..705c6962c75f 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -123,7 +123,7 @@ public static void writeObjectUsingSerializerProvider( serializer.serialize(o, jsonGenerator, serializers); } } - + public static T readObjectUsingDeserializationContext( final JsonParser jp, final DeserializationContext deserializationContext, @@ -133,6 +133,15 @@ public static T readObjectUsingDeserializationContext( return deserializationContext.readValue(jp, clazz); } + public static Object readObjectUsingDeserializationContext( + final JsonParser jp, + final DeserializationContext deserializationContext, + final JavaType javaType + ) throws IOException + { + return deserializationContext.readValue(jp, javaType); + } + /** * Convert the given object to an array of bytes. Use when the object is * known serializable so that the Jackson exception can be suppressed. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 9950695f28ce..dbd6f2869a75 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -293,7 +293,6 @@ public boolean isVectorize() return vectorize; } - @SuppressWarnings("unused") public boolean isIntermediateResultAsMapCompat() { return intermediateResultAsMapCompat; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index b09c8182d046..e978e332e394 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -19,23 +19,14 @@ package org.apache.druid.query.groupby; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; -import org.apache.druid.data.input.Row; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -51,7 +42,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DataSource; import org.apache.druid.query.FrameSignaturePair; @@ -82,7 +72,6 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.Comparator; @@ -454,104 +443,7 @@ public TypeReference getResultTypeReference() @Override public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final GroupByQuery query) { - final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - - return objectMapper; - -// // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. -// final JsonSerializer serializer = new JsonSerializer() -// { -// @Override -// public void serialize( -// final ResultRow resultRow, -// final JsonGenerator jg, -// final SerializerProvider serializers -// ) throws IOException -// { -// if (resultAsArray) { -// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.getArray()); -// } else { -// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.toMapBasedRow(query)); -// } -// } -// }; -// -// // Deserializer that can deserialize either array- or map-based rows. -// final JsonDeserializer deserializer = new JsonDeserializer() -// { -// final Class[] dimensionClasses = createDimensionClasses(query); -// boolean containsComplexDimensions = query.getDimensions() -// .stream() -// .anyMatch( -// dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) -// ); -// -// @Override -// public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException -// { -// if (jp.isExpectedStartObjectToken()) { -// final Row row = jp.readValueAs(Row.class); -// final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); -// if (containsComplexDimensions) { -// final List queryDimensions = query.getDimensions(); -// for (int i = 0; i < queryDimensions.size(); ++i) { -// if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { -// final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; -// resultRow.set( -// dimensionIndexInResultRow, -// objectMapper.convertValue( -// resultRow.get(dimensionIndexInResultRow), -// dimensionClasses[i] -// ) -// ); -// } -// } -// } -// return resultRow; -// } else { -// Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; -// -// if (!jp.isExpectedStartArrayToken()) { -// throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); -// } -// -// jp.nextToken(); -// -// int numObjects = 0; -// while (jp.currentToken() != JsonToken.END_ARRAY) { -// if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { -// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( -// jp, -// ctxt, -// dimensionClasses[numObjects - query.getResultRowDimensionStart()] -// ); -// } else { -// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( -// jp, -// ctxt, -// Object.class -// ); -// } -// jp.nextToken(); -// ++numObjects; -// } -// return ResultRow.of(objectArray); -// } -// } -// }; -// -// class GroupByResultRowModule extends SimpleModule -// { -// private GroupByResultRowModule() -// { -// addSerializer(ResultRow.class, serializer); -// addDeserializer(ResultRow.class, deserializer); -// } -// } -// -// final ObjectMapper newObjectMapper = objectMapper.copy(); -// newObjectMapper.registerModule(new GroupByResultRowModule()); -// return newObjectMapper; + return ResultRowObjectMapperDecoratorUtil.decorateObjectMapper(objectMapper, query, queryConfig); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java new file mode 100644 index 000000000000..9f951923913c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.druid.data.input.Row; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; + +public class ResultRowObjectMapperDecoratorUtil +{ + public static ObjectMapper decorateObjectMapper( + final ObjectMapper baseObjectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final JsonDeserializer deserializer = getDeserializer(baseObjectMapper, query, groupByQueryConfig); + final JsonSerializer serializer = getSerializer(query, groupByQueryConfig); + if (deserializer == null && serializer == null) { + return baseObjectMapper; + } + + final ObjectMapper decoratedObjectMapper = baseObjectMapper.copy(); + class GroupByResultRowModule extends SimpleModule + { + private GroupByResultRowModule() + { + if (serializer != null) { + addSerializer(ResultRow.class, serializer); + } + if (deserializer != null) { + addDeserializer(ResultRow.class, deserializer); + } + } + } + decoratedObjectMapper.registerModule(new GroupByResultRowModule()); + return decoratedObjectMapper; + } + + @Nullable + private static JsonDeserializer getDeserializer( + final ObjectMapper objectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + final boolean dimensionsRequireConversion = query.getDimensions() + .stream() + .anyMatch( + dimensionSpec -> dimensionRequiresConversion(dimensionSpec.getOutputType()) + ); + + if (arrayBasedRows && !dimensionsRequireConversion) { + // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, + // and we can save the overhead of making a copy of the ObjectMapper + return null; + } else if (!arrayBasedRows && !dimensionsRequireConversion) { + // Have to deserialize map based rows, however don't have to deserialize the dimensions individually + // Deserializer that can deserialize either array- or map-based rows. + return new JsonDeserializer() + { + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + return ResultRow.fromLegacyRow(row, query); + } else { + return ResultRow.of(jp.readValueAs(Object[].class)); + } + } + }; + + } else { + // Have to deserialize the dimensions carefully + return new JsonDeserializer() + { + final JavaType[] javaTypes = createJavaTypesForResultRow(query); + + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (dimensionRequiresConversion(queryDimensions.get(i).getOutputType())) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + javaTypes[dimensionIndexInResultRow] + ) + ); + } + } + + return resultRow; + } else { + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } + + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + int index = 0; + + while (jp.nextToken() != JsonToken.END_ARRAY) { + objectArray[index] = JacksonUtils.readObjectUsingDeserializationContext(jp, ctxt, javaTypes[index]); + ++index; + jp.nextToken(); + } + + return ResultRow.of(objectArray); + } + } + }; + } + } + + @Nullable + private static JsonSerializer getSerializer( + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + if (arrayBasedRows) { + return null; + } else { + if (resultAsArray) { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, resultRow.getArray()); + } + }; + + } else { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider( + jsonGenerator, + serializerProvider, + resultRow.toMapBasedRow(query) + ); + } + }; + } + } + } + + private static boolean dimensionRequiresConversion(final ColumnType dimensionType) + { + return dimensionType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(dimensionType); + } + + private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery) + { + final TypeFactory typeFactory = TypeFactory.defaultInstance(); + final JavaType[] javaTypes = new JavaType[groupByQuery.getResultRowSizeWithPostAggregators()]; + final List dimensions = groupByQuery.getDimensions(); + for (int i = 0; i < groupByQuery.getResultRowSizeWithPostAggregators(); ++i) { + if (i >= groupByQuery.getResultRowDimensionStart() && i < groupByQuery.getResultRowAggregatorStart()) { + DimensionSpec dimension = dimensions.get(i - groupByQuery.getResultRowDimensionStart()); + ColumnType dimensionType = dimensions.get(i - groupByQuery.getResultRowDimensionStart()).getOutputType(); + if (dimensionType.is(ValueType.COMPLEX)) { + //noinspection rawtypes + NullableTypeStrategy nullableTypeStrategy = dimensionType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + dimension, + dimensionType + ); + } + javaTypes[i] = typeFactory.constructType(nullableTypeStrategy.getClazz()); + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } + return javaTypes; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index cc51c8a7cd70..0e73d5db6f48 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; From a49b0af7f561ea4170f0fbf75f7ee525c3bc48c7 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 24 Jul 2024 02:16:40 +0530 Subject: [PATCH 07/10] fix --- .../druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java index 9f951923913c..7e96977f45aa 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -149,7 +149,6 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c while (jp.nextToken() != JsonToken.END_ARRAY) { objectArray[index] = JacksonUtils.readObjectUsingDeserializationContext(jp, ctxt, javaTypes[index]); ++index; - jp.nextToken(); } return ResultRow.of(objectArray); From 98ea5bbbaf5fa30415876c5b2855a8dd7041475d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 24 Jul 2024 11:44:45 +0530 Subject: [PATCH 08/10] null fix --- .../druid/java/util/common/jackson/JacksonUtils.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index 705c6962c75f..9c71fc705684 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JavaType; @@ -124,21 +125,29 @@ public static void writeObjectUsingSerializerProvider( } } + @Nullable public static T readObjectUsingDeserializationContext( final JsonParser jp, final DeserializationContext deserializationContext, final Class clazz ) throws IOException { + if (jp.currentToken() == JsonToken.VALUE_NULL) { + return null; + } return deserializationContext.readValue(jp, clazz); } + @Nullable public static Object readObjectUsingDeserializationContext( final JsonParser jp, final DeserializationContext deserializationContext, final JavaType javaType ) throws IOException { + if (jp.currentToken() == JsonToken.VALUE_NULL) { + return null; + } return deserializationContext.readValue(jp, javaType); } From c7351d7ebf3c384e392842be08901266f9ffccdb Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 26 Jul 2024 10:04:34 +0530 Subject: [PATCH 09/10] javadocs --- .../util/common/jackson/JacksonUtils.java | 8 ++++ .../apache/druid/query/QueryToolChest.java | 5 +++ .../ResultRowObjectMapperDecoratorUtil.java | 37 +++++++++++++++++-- 3 files changed, 47 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index 9c71fc705684..b02de3ce4932 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -125,6 +125,11 @@ public static void writeObjectUsingSerializerProvider( } } + /** + * Reads an object using the {@link JsonParser}. It reuses the provided {@link DeserializationContext} which offers + * better performance that calling {@link JsonParser#readValueAs(Class)} because it avoids re-creating the {@link DeserializationContext} + * for each readValue call + */ @Nullable public static T readObjectUsingDeserializationContext( final JsonParser jp, @@ -138,6 +143,9 @@ public static T readObjectUsingDeserializationContext( return deserializationContext.readValue(jp, clazz); } + /** + * @see #readObjectUsingDeserializationContext(JsonParser, DeserializationContext, Class) + */ @Nullable public static Object readObjectUsingDeserializationContext( final JsonParser jp, diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index fa394beec43a..978b49226154 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -88,6 +88,11 @@ public final JavaType getBySegmentResultType() * For most queries, this is a no-op, but it can be useful for query types that support more than one result * serialization format. Queries that implement this method must not modify the provided ObjectMapper, but instead * must return a copy. + *

+ * Jackson's default implementation of deserialization is usually optimised and this method should be overriden + * only if there is a functional requirement of so. The method must be benchmarked in isolation, without other portions + * of the query engine executing as modifying this method can alter the performance of queries where deserializing is + * a major portion of the execution. */ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final QueryType query) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java index 7e96977f45aa..110620d718af 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -42,8 +42,20 @@ import java.io.IOException; import java.util.List; +/** + * Utility class for conditional serde of {@link ResultRow} objects. Depending on the query configuration and the query + * dimensions, this class chooses an optimally performant method for serdeing the result rows while also preserving the + * dimension classes. + * Any modification this class must be benchmarked properly as it runs in a hot-loop and can have significant impact on + * long-running queries. See {@code GroupByDeserializationBenchmark} for existing benchmarks + */ public class ResultRowObjectMapperDecoratorUtil { + /** + * Decorates the provided object mapper so that it can read the result rows generated by the given query and the + * groupByQueryConfig. It never modifies the provided object mapper. It can either return the same mapper undecorated, + * or clones the object mapper before decorating it. + */ public static ObjectMapper decorateObjectMapper( final ObjectMapper baseObjectMapper, final GroupByQuery query, @@ -73,6 +85,10 @@ private GroupByResultRowModule() return decoratedObjectMapper; } + /** + * Returns a deserializer required to for the result rows of the provided query. It returns null if no special + * deserialization is required, and type-unaware generic java objects are sufficient. + */ @Nullable private static JsonDeserializer getDeserializer( final ObjectMapper objectMapper, @@ -89,13 +105,14 @@ private static JsonDeserializer getDeserializer( dimensionSpec -> dimensionRequiresConversion(dimensionSpec.getOutputType()) ); + // Most common case - when array based rows are used, and grouping is done on primitive/array/json types if (arrayBasedRows && !dimensionsRequireConversion) { // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, // and we can save the overhead of making a copy of the ObjectMapper return null; } else if (!arrayBasedRows && !dimensionsRequireConversion) { - // Have to deserialize map based rows, however don't have to deserialize the dimensions individually - // Deserializer that can deserialize either array- or map-based rows. + // We have to deserialize map based rows, however we don't have to deserialize the dimensions individually + // Returns a deserializer that can deserialize both map and array based rows simultaneously return new JsonDeserializer() { @Override @@ -111,7 +128,7 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c }; } else { - // Have to deserialize the dimensions carefully + // Dimensions need to be serialized individually because some of them require conversion to specialized types return new JsonDeserializer() { final JavaType[] javaTypes = createJavaTypesForResultRow(query); @@ -158,6 +175,10 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c } } + /** + * Returns a legacy mode aware serialiazer that serializes the result rows as arrays or maps depending on the query + * configuration + */ @Nullable private static JsonSerializer getSerializer( final GroupByQuery query, @@ -199,11 +220,21 @@ public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, Serializ } } + /** + * Returns true if the dimension needs to be converted from generic Java objects to the specialized column type. It involves all + * complex types, except for JSON types. JSON types are special in a way that they can work with the generic java objects + * without any conversion + */ private static boolean dimensionRequiresConversion(final ColumnType dimensionType) { return dimensionType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(dimensionType); } + /** + * Creates java types for deserializing the result row. For timestamp, aggregators and post-aggregators, it resorts to + * {@link Object.class}. For dimensions requiring conversion (check {@link #dimensionRequiresConversion(ColumnType)}), + * it returns the java type for the associated class of the complex object. + */ private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery) { final TypeFactory typeFactory = TypeFactory.defaultInstance(); From 751ff9ecebdb84f3c220efb3da95904b7eee6662 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 26 Jul 2024 10:33:23 +0530 Subject: [PATCH 10/10] static check --- .../druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java index 110620d718af..60b55ce74828 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -232,7 +232,7 @@ private static boolean dimensionRequiresConversion(final ColumnType dimensionTyp /** * Creates java types for deserializing the result row. For timestamp, aggregators and post-aggregators, it resorts to - * {@link Object.class}. For dimensions requiring conversion (check {@link #dimensionRequiresConversion(ColumnType)}), + * {@code Object.class}. For dimensions requiring conversion (check {@link #dimensionRequiresConversion(ColumnType)}), * it returns the java type for the associated class of the complex object. */ private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery)