From be44f0077c37ddd7cf403cd93d5d56d6634f7a0c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 8 Dec 2022 22:39:09 -0800 Subject: [PATCH] MSQ: Only look at sqlInsertSegmentGranularity on the outer query. The planner sets sqlInsertSegmentGranularity in its context when using PARTITIONED BY, which sets it on every native query in the stack (as all native queries for a SQL query typically have the same context). QueryKit would interpret that as a request to configure bucketing for all native queries. This isn't useful, as bucketing is only used for the penultimate stage in INSERT / REPLACE. So, this patch modifies QueryKit to only look at sqlInsertSegmentGranularity on the outermost query. As an additional change, this patch switches the static ObjectMapper to use the processwide ObjectMapper for deserializing Granularities. Saves an ObjectMapper instance, and ensures that if there are any special serdes registered for Granularity, we'll pick them up. --- .../apache/druid/msq/exec/ControllerImpl.java | 2 +- .../druid/msq/querykit/DataSourcePlan.java | 16 +++++++++++++++- .../apache/druid/msq/querykit/QueryKitUtils.java | 15 ++++++++------- .../GroupByPostShuffleFrameProcessor.java | 14 ++++++++++---- .../GroupByPostShuffleFrameProcessorFactory.java | 3 ++- .../msq/querykit/groupby/GroupByQueryKit.java | 11 ++++++++++- .../querykit/scan/ScanQueryFrameProcessor.java | 7 +++++-- .../scan/ScanQueryFrameProcessorFactory.java | 3 ++- .../druid/msq/querykit/scan/ScanQueryKit.java | 3 ++- .../apache/druid/msq/sql/MSQTaskSqlEngine.java | 1 + .../scan/ScanQueryFrameProcessorTest.java | 4 +++- 11 files changed, 59 insertions(+), 20 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 318c33a759c7..528baa4c27d1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -989,7 +989,7 @@ private QueryKit makeQueryControllerToolKit() final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() .put(ScanQuery.class, new ScanQueryKit(context.jsonMapper())) - .put(GroupByQuery.class, new GroupByQueryKit()) + .put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper())) .build(); return new MultiQueryKit(kitMap); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index a5c61c5bd75e..30544cf31bff 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -67,6 +68,16 @@ */ public class DataSourcePlan { + /** + * A map with {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} set to null, so we can clear it from the context + * of subqueries. + */ + private static final Map CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>(); + + static { + CONTEXT_MAP_NO_SEGMENT_GRANULARITY.put(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, null); + } + private final DataSource newDataSource; private final List inputSpecs; private final IntSet broadcastInputs; @@ -247,7 +258,10 @@ private static DataSourcePlan forQuery( { final QueryDefinition subQueryDef = queryKit.makeQueryDefinition( queryId, - dataSource.getQuery(), + + // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the + // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. + dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), queryKit, ShuffleSpecFactories.subQueryWithMaxWorkerCount(maxWorkerCount), maxWorkerCount, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java index fcd723291650..1f863a8c735e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java @@ -24,7 +24,6 @@ import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.SortColumn; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -76,15 +75,16 @@ public class QueryKitUtils */ public static final String CTX_TIME_COLUMN_NAME = "__timeColumn"; - private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); - - public static Granularity getSegmentGranularityFromContext(@Nullable final Map context) + public static Granularity getSegmentGranularityFromContext( + final ObjectMapper objectMapper, + @Nullable final Map context + ) { final Object o = context == null ? null : context.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); if (o instanceof String) { try { - return OBJECT_MAPPER.readValue((String) o, Granularity.class); + return objectMapper.readValue((String) o, Granularity.class); } catch (JsonProcessingException e) { throw new ISE("Invalid segment granularity [%s]", o); @@ -188,9 +188,10 @@ public static RowSignature sortableSignature( * @throws IllegalArgumentException if the provided granularity is not supported */ @Nullable - public static VirtualColumn makeSegmentGranularityVirtualColumn(final Query query) + public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query query) { - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(query.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext()); final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME); if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java index a44c14e3cdc2..207fe53de03a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; @@ -92,7 +93,8 @@ public GroupByPostShuffleFrameProcessor( final FrameReader frameReader, final RowSignature resultSignature, final ClusterBy clusterBy, - final MemoryAllocator allocator + final MemoryAllocator allocator, + final ObjectMapper jsonMapper ) { this.query = query; @@ -107,7 +109,7 @@ public GroupByPostShuffleFrameProcessor( this.finalizeFn = makeFinalizeFn(query); this.havingSpec = cloneHavingSpec(query); this.columnSelectorFactoryForFrameWriter = - makeVirtualColumnsForFrameWriter(query).wrap( + makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap( RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( query, () -> outputRow, @@ -311,9 +313,13 @@ private static HavingSpec cloneHavingSpec(final GroupByQuery query) * Create virtual columns containing "bonus" fields that should be attached to the {@link FrameWriter} for * this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}. */ - private static VirtualColumns makeVirtualColumnsForFrameWriter(final GroupByQuery query) + private static VirtualColumns makeVirtualColumnsForFrameWriter( + final ObjectMapper jsonMapper, + final GroupByQuery query + ) { - final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); if (segmentGranularityVirtualColumn == null) { return VirtualColumns.EMPTY; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java index 5987eb02fae0..ffb8bacf5e62 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java @@ -118,7 +118,8 @@ public ProcessorsAndChannels, Long> makeProcessors( readableInput.getChannelFrameReader(), stageDefinition.getSignature(), stageDefinition.getClusterBy(), - outputChannel.getFrameMemoryAllocator() + outputChannel.getFrameMemoryAllocator(), + frameContext.jsonMapper() ); } ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 411fe118a29e..402d2dfa3d8c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.SortColumn; @@ -56,6 +57,13 @@ public class GroupByQueryKit implements QueryKit { + private final ObjectMapper jsonMapper; + + public GroupByQueryKit(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + @Override public QueryDefinition makeQueryDefinition( final String queryId, @@ -85,7 +93,8 @@ public QueryDefinition makeQueryDefinition( final GroupByQuery queryToRun = (GroupByQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun); final ClusterBy resultClusterBy = QueryKitUtils.clusterByWithSegmentGranularity(computeClusterByForResults(queryToRun), segmentGranularity); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 307d274c73bf..0482e2715dc7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit.scan; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; @@ -92,7 +93,8 @@ public ScanQueryFrameProcessor( final ResourceHolder outputChannel, final ResourceHolder frameWriterFactoryHolder, @Nullable final AtomicLong runningCountForLimit, - final long memoryReservedForBroadcastJoin + final long memoryReservedForBroadcastJoin, + final ObjectMapper jsonMapper ) { super( @@ -111,7 +113,8 @@ public ScanQueryFrameProcessor( final List frameWriterVirtualColumns = new ArrayList<>(); frameWriterVirtualColumns.add(partitionBoostVirtualColumn); - final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); if (segmentGranularityVirtualColumn != null) { frameWriterVirtualColumns.add(segmentGranularityVirtualColumn); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java index 2a948fd45625..bda53af69641 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java @@ -100,7 +100,8 @@ protected FrameProcessor makeProcessor( allocatorHolder )), runningCountForLimit, - frameContext.memoryParameters().getBroadcastJoinMemory() + frameContext.memoryParameters().getBroadcastJoinMemory(), + frameContext.jsonMapper() ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 5bfb70b52c91..9e44f152eb2d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -116,7 +116,8 @@ public QueryDefinition makeQueryDefinition( signatureToUse = scanSignature; } else { final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature); - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext()); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); final List clusterByColumns = new ArrayList<>(); // Add regular orderBys. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index a91844114dda..2ec08e03783d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -207,6 +207,7 @@ private static void validateInsert( try { segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext( + plannerContext.getJsonMapper(), plannerContext.queryContextMap() ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index 2ea2958c7368..d93e8df42df9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -37,6 +37,7 @@ import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; @@ -152,7 +153,8 @@ public void close() }, new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})), null, - 0L + 0L, + new DefaultObjectMapper() ); ListenableFuture retVal = exec.runFully(processor, null);