diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java index e5f2a0152fd8..bc0c64c251a7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java @@ -34,7 +34,7 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.MSQException; -import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.expression.TimestampFloorExprMacro; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.ColumnType; @@ -191,11 +191,11 @@ public static RowSignature sortableSignature( * @throws IllegalArgumentException if the provided granularity is not supported */ @Nullable - public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query query) + public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final QueryContext queryContext) { final Granularity segmentGranularity = - QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext()); - final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME); + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryContext.asMap()); + final String timeColumnName = queryContext.getString(QueryKitUtils.CTX_TIME_COLUMN_NAME); if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) { return null; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index b3bcf899ec17..41c1df884f96 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -39,6 +39,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; import org.apache.druid.query.operator.AbstractSortOperatorFactory; import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; @@ -46,7 +47,6 @@ import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; import org.apache.druid.query.operator.PartitionSortOperatorFactory; -import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; @@ -87,7 +87,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor final AtomicInteger rowId = new AtomicInteger(0); public WindowOperatorQueryFrameProcessor( - WindowOperatorQuery query, + QueryContext queryContext, ReadableFrameChannel inputChannel, WritableFrameChannel outputChannel, FrameWriterFactory frameWriterFactory, @@ -100,7 +100,7 @@ public WindowOperatorQueryFrameProcessor( this.outputChannel = outputChannel; this.frameWriterFactory = frameWriterFactory; this.resultRowAndCols = new ArrayList<>(); - this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context()); + this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext); this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList); this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized); @@ -110,7 +110,7 @@ public WindowOperatorQueryFrameProcessor( this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); final List frameWriterVirtualColumns = new ArrayList<>(); final VirtualColumn segmentGranularityVirtualColumn = - QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, queryContext); if (segmentGranularityVirtualColumn != null) { frameWriterVirtualColumns.add(segmentGranularityVirtualColumn); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index 68f6f564774d..97da2db6b64f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -156,7 +156,7 @@ public ProcessorsAndChannels makeProcessors( outputChannels.get(readableInput.getStagePartition().getPartitionNumber()); return new WindowOperatorQueryFrameProcessor( - query, + query.context(), readableInput.getChannel(), outputChannel.getWritableChannel(), stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java index 5b0a3ddefd2e..e9783b8366c5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java @@ -321,7 +321,7 @@ private static VirtualColumns makeVirtualColumnsForFrameWriter( virtualColumns.add(partitionBoostVirtualColumn); final VirtualColumn segmentGranularityVirtualColumn = - QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context()); if (segmentGranularityVirtualColumn != null) { virtualColumns.add(segmentGranularityVirtualColumn); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 05f80b9805d5..1be191bddbe6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -137,7 +137,7 @@ public ScanQueryFrameProcessor( frameWriterVirtualColumns.add(partitionBoostVirtualColumn); final VirtualColumn segmentGranularityVirtualColumn = - QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context()); if (segmentGranularityVirtualColumn != null) { frameWriterVirtualColumns.add(segmentGranularityVirtualColumn); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java index 5d1b350ca929..e9484ec76e64 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java @@ -125,7 +125,7 @@ public void testFrameWriterReachingCapacity() throws IOException final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal(); final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor( - query, + query.context(), factChannel.getChannel(), outputChannel.writable(), frameWriterFactory, @@ -209,7 +209,7 @@ public void testOutputChannelReachingCapacity() throws IOException final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal(); final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor( - query, + query.context(), factChannel.getChannel(), outputChannel.writable(), frameWriterFactory, @@ -316,7 +316,7 @@ public void runProcessor(int maxRowsMaterialized, int expectedNumFramesWritten) ); final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor( - query, + query.context(), factChannel.getChannel(), countingWritableFrameChannel, frameWriterFactory,