From 8a2cb05dd68c04705ed798b0d6572420f9e6d1ed Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 2 Nov 2023 11:38:26 +0530 Subject: [PATCH] Fix an issue with passing order by and limit to realtime tasks (#15301) While running queries on real time tasks using MSQ, there is an issue with queries with certain order by columns. If the query specifies a non time column, the query is planned as it is supported by MSQ. However, this throws an exception when passed to real time tasks once as the native query stack does not support it. This PR resolves this by removing the ordering from the query before contacting real time tasks. Fixes a bug with MSQ while reading data from real time tasks with non time ordering --- .../scan/ScanQueryFrameProcessor.java | 24 ++++++- .../druid/msq/exec/MSQLoadedSegmentTests.java | 66 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 278a9c251dea..ff15e116df6f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -57,6 +57,7 @@ import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.msq.querykit.BaseLeafFrameProcessor; import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.query.Druids; import org.apache.druid.query.IterableRowsCursorHelper; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.scan.ScanQuery; @@ -78,6 +79,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -172,13 +174,31 @@ public static Sequence mappingFunction(Sequence input }).map(List::toArray); } + /** + * Prepares the scan query to be sent to a data server. + * If the query contains a non-time ordering, removes the ordering and limit, as the native query stack does not + * support it. + */ + private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery scanQuery) + { + if (ScanQuery.Order.NONE.equals(scanQuery.getTimeOrder()) && !scanQuery.getOrderBys().isEmpty()) { + return Druids.ScanQueryBuilder.copy(scanQuery) + .orderBy(ImmutableList.of()) + .limit(0) + .build(); + } else { + return scanQuery; + } + } + @Override protected ReturnOrAwait runWithLoadedSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { + ScanQuery preparedQuery = prepareScanQueryForDataServer(query); final Pair> statusSequencePair = segment.fetchRowsFromDataServer( - query, + preparedQuery, ScanQueryFrameProcessor::mappingFunction, closer ); @@ -188,7 +208,7 @@ protected ReturnOrAwait runWithLoadedSegment(final SegmentWithDescriptor s return runWithSegment(segment); } - RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(query, jsonMapper); + RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper); Pair cursorFromIterable = IterableRowsCursorHelper.getCursorFromYielder( statusSequencePair.rhs, rowSignature diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java index b2c07e267e4c..ae10c16a0083 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java @@ -38,6 +38,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -50,6 +51,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.hamcrest.CoreMatchers; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -57,6 +59,7 @@ import java.util.Map; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -148,6 +151,69 @@ public void testSelectWithLoadedSegmentsOnFoo() throws IOException .verifyResults(); } + @Test + public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException + { + RowSignature resultSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + doAnswer( + invocationOnMock -> { + ScanQuery query = invocationOnMock.getArgument(0); + ScanQuery.verifyOrderByForNativeExecution(query); + Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit()); + return Pair.of( + LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) + ) + ) + ); + } + + ) + .when(loadedSegmentDataProvider) + .fetchRowsFromDataServer(any(), any(), any(), any()); + + testSelectQuery() + .setSql("select cnt, dim1 from foo order by dim1") + .setExpectedMSQSpec( + MSQSpec.builder() + .query( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("cnt", "dim1") + .orderBy(ImmutableList.of(new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING))) + .context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature)) + .build() + ) + .columnMappings(ColumnMappings.identity(resultSignature)) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(resultSignature) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1L, ""}, + new Object[]{1L, "1"}, + new Object[]{1L, "10.1"}, + new Object[]{1L, "2"}, + new Object[]{1L, "abc"}, + new Object[]{1L, "def"}, + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + )) + .verifyResults(); + } + @Test public void testGroupByWithLoadedSegmentsOnFoo() throws IOException {