Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -191,11 +191,11 @@ public static RowSignature sortableSignature(
* @throws IllegalArgumentException if the provided granularity is not supported
*/
@Nullable
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query<?> query)
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final QueryContext queryContext)
{
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext());
final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryContext.asMap());
final String timeColumnName = queryContext.getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);

if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final AtomicInteger rowId = new AtomicInteger(0);

public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
QueryContext queryContext,
ReadableFrameChannel inputChannel,
WritableFrameChannel outputChannel,
FrameWriterFactory frameWriterFactory,
Expand All @@ -100,7 +100,7 @@ public WindowOperatorQueryFrameProcessor(
this.outputChannel = outputChannel;
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);

Expand All @@ -110,7 +110,7 @@ public WindowOperatorQueryFrameProcessor(
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, queryContext);
if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());

return new WindowOperatorQueryFrameProcessor(
query,
query.context(),
readableInput.getChannel(),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private static VirtualColumns makeVirtualColumnsForFrameWriter(

virtualColumns.add(partitionBoostVirtualColumn);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context());
if (segmentGranularityVirtualColumn != null) {
virtualColumns.add(segmentGranularityVirtualColumn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public ScanQueryFrameProcessor(
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);

final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query.context());

if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testFrameWriterReachingCapacity() throws IOException

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testOutputChannelReachingCapacity() throws IOException

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
Expand Down Expand Up @@ -316,7 +316,7 @@ public void runProcessor(int maxRowsMaterialized, int expectedNumFramesWritten)
);

final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
query.context(),
factChannel.getChannel(),
countingWritableFrameChannel,
frameWriterFactory,
Expand Down