From a690567bb0f4dae5177c5536e6c4260f63d8e5c5 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 12 Sep 2024 09:09:13 +0530 Subject: [PATCH 1/3] MSQ window functions: Reject MVDs during window processing --- .../WindowOperatorQueryFrameProcessor.java | 10 +++++++++ .../apache/druid/msq/exec/MSQWindowTest.java | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 3dc62f3a60de..aab8f1f1a6bb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -34,6 +34,7 @@ import org.apache.druid.frame.util.SettableLongVirtualColumn; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.indexing.error.MSQException; @@ -54,6 +55,7 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; @@ -451,6 +453,14 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List context) + { + testSelectQuery() + .setSql("select cityName, countryName, array_to_mv(array[1,length(cityName)]), " + + "row_number() over (partition by array_to_mv(array[1,length(cityName)]) order by countryName, cityName)\n" + + "from wikipedia\n" + + "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n" + + "order by 1, 2, 3") + .setQueryContext(context) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( + "Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) + )) + .verifyExecutionError(); + } } From b2223f6e9f55b3820618f56046b32d8d8c593838 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 19 Sep 2024 15:40:44 +0530 Subject: [PATCH 2/3] MSQ window functions: Reject MVDs during window processing --- .../apache/druid/msq/exec/MSQWindowTest.java | 22 ++++++++++++++++++- .../rowsandcols/RearrangedRowsAndColumns.java | 13 ++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index f12204ca0135..5712785d68f9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -2329,7 +2329,7 @@ public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map context) + public void testFailurePartitionByMVD_1(String contextName, Map context) { testSelectQuery() .setSql("select cityName, countryName, array_to_mv(array[1,length(cityName)]), " @@ -2345,4 +2345,24 @@ public void testFailurePartitionByMVD(String contextName, Map co )) .verifyExecutionError(); } + + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testFailurePartitionByMVD_2(String contextName, Map context) + { + testSelectQuery() + .setSql(" select cityName, countryName, array_to_mv(array[1,length(cityName)])," + + "row_number() over (partition by countryName order by countryName, cityName) as c1,\n" + + "row_number() over (partition by array_to_mv(array[1,length(cityName)]) order by countryName, cityName) as c2\n" + + "from wikipedia\n" + + "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n" + + "order by 1, 2, 3") + .setQueryContext(context) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( + "Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) + )) + .verifyExecutionError(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index f1793f8fd0e4..a301f9394c48 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -20,6 +20,7 @@ package org.apache.druid.query.rowsandcols; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -128,7 +130,16 @@ public boolean isNull(int rowNum) @Override public Object getObject(int rowNum) { - return accessor.getObject(pointers[start + rowNum]); + Object value = accessor.getObject(pointers[start + rowNum]); + if (ColumnType.STRING.equals(getType()) && value instanceof List) { + // special handling to reject MVDs + throw new UOE( + "Encountered a multi value column [%s]. Window processing does not support MVDs. " + + "Consider using UNNEST or MV_TO_ARRAY.", + name + ); + } + return value; } @Override From 6a4bb6d83c69b475756400b388f3739499a5bc45 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 20 Sep 2024 12:36:38 +0530 Subject: [PATCH 3/3] Remove parameterization from MSQWindowTest --- .../apache/druid/msq/exec/MSQWindowTest.java | 341 ++++++++---------- 1 file changed, 144 insertions(+), 197 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 5712785d68f9..2ba9d56ac69a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -67,11 +67,8 @@ import org.apache.druid.timeline.SegmentId; import org.hamcrest.CoreMatchers; import org.junit.internal.matchers.ThrowableMessageMatcher; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.Test; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -80,19 +77,8 @@ public class MSQWindowTest extends MSQTestBase { - public static Collection data() - { - Object[][] data = new Object[][]{ - {DEFAULT, DEFAULT_MSQ_CONTEXT} - }; - - return Arrays.asList(data); - } - - - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByAndInnerGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -111,7 +97,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma ColumnType.FLOAT ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -124,7 +110,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))), @@ -154,7 +140,7 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -173,9 +159,8 @@ public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Ma .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contextName, Map context) + @Test + public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -201,7 +186,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -218,7 +203,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -261,7 +246,7 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex new Object[]{5.0f, 5.0, 5.0, 21.0}, new Object[]{6.0f, 6.0, 6.0, 21.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -280,12 +265,8 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( - String contextName, - Map context - ) + @Test + public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -311,7 +292,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -328,7 +309,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -375,7 +356,7 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( new Object[]{5.0f, 5.0, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -394,12 +375,8 @@ public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy( .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( - String contextName, - Map context - ) + @Test + public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -424,7 +401,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( ColumnType.DOUBLE ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -441,7 +418,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("d0", ColumnType.FLOAT) .add("d1", ColumnType.DOUBLE) @@ -488,7 +465,7 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( new Object[]{5.0f, 5.0, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().totalFiles(1), @@ -507,9 +484,8 @@ public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed( .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -528,7 +504,7 @@ public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartition() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -606,7 +581,7 @@ public void testWindowOnFooWithNoGroupByAndPartition(String contextName, Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]") .build(); @@ -620,7 +595,7 @@ public void testWindowOnFooWithNoGroupByAndPartition(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -671,7 +645,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]" @@ -688,7 +662,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of( @@ -721,13 +695,12 @@ public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String context new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionByAnother() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -742,7 +715,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]" @@ -759,7 +732,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m2"))), @@ -789,13 +762,12 @@ public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName new Object[]{5.0f, 5.0}, new Object[]{6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, Map context) + @Test + public void testWindowOnFooWithGroupByAndInnerLimit() { RowSignature rowSignature = RowSignature.builder() .add("m1", ColumnType.FLOAT) @@ -828,10 +800,10 @@ public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"v0\",\"type\":\"LONG\"}]" @@ -904,7 +875,7 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String con .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("v0", ColumnType.LONG) .add("m1", ColumnType.FLOAT) @@ -939,19 +910,18 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String con new Object[]{3, 5.0f, 5.0}, new Object[]{3, 6.0f, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, Map context) + @Test + public void testWindowOnFooWithNoGroupByAndEmptyOver() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]") .build(); @@ -976,7 +946,7 @@ public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByOrderBYWithJoin() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1024,7 +993,7 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma final Map contextWithRowSignature1 = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]" @@ -1073,7 +1042,7 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("m1", ColumnType.FLOAT) .add("w0", ColumnType.DOUBLE) @@ -1109,17 +1078,16 @@ public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Ma new Object[]{5.0f, 5.0, 5.0}, new Object[]{6.0f, 6.0, 6.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithJoin() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1128,7 +1096,7 @@ public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map contextWithRowSignature1 = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]" @@ -1177,7 +1145,7 @@ public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map context) + @Test + public void testWindowOnFooWithDim2() { RowSignature rowSignature = RowSignature.builder() .add("dim2", ColumnType.STRING) @@ -1233,7 +1200,7 @@ public void testWindowOnFooWithDim2(String contextName, Map cont final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1250,7 +1217,7 @@ public void testWindowOnFooWithDim2(String contextName, Map cont .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("dim2", ColumnType.STRING).add("w0", ColumnType.DOUBLE).build(), ImmutableList.of( new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("dim2"))), @@ -1290,17 +1257,16 @@ public void testWindowOnFooWithDim2(String contextName, Map cont new Object[]{"abc", 5.0}, new Object[]{null, 8.0} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, Map context) + @Test + public void testWindowOnFooWithEmptyOverWithUnnest() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1336,7 +1302,7 @@ public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, Map context) + @Test + public void testWindowOnFooWithPartitionByAndWithUnnest() { final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" @@ -1419,7 +1384,7 @@ public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, Map< .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder() .add("m1", ColumnType.FLOAT) .add("w0", ColumnType.DOUBLE) @@ -1457,14 +1422,13 @@ public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, Map< new Object[]{5.0f, 5.0, NullHandling.sqlCompatible() ? null : ""}, new Object[]{6.0f, 6.0, NullHandling.sqlCompatible() ? null : ""} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } // Insert Tests - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testInsertWithWindow(String contextName, Map context) + @Test + public void testInsertWithWindow() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 1.0}, @@ -1487,7 +1451,7 @@ public void testInsertWithWindow(String contextName, Map context + "SUM(m1) OVER(PARTITION BY m1) as summ1\n" + "from foo\n" + "GROUP BY __time, m1 PARTITIONED BY ALL") - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedResultRows(expectedRows) .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) @@ -1495,9 +1459,8 @@ public void testInsertWithWindow(String contextName, Map context } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testInsertWithWindowEmptyOver(String contextName, Map context) + @Test + public void testInsertWithWindowEmptyOver() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 21.0}, @@ -1520,7 +1483,7 @@ public void testInsertWithWindowEmptyOver(String contextName, Map context) + @Test + public void testInsertWithWindowPartitionByOrderBy() { List expectedRows = ImmutableList.of( new Object[]{946684800000L, 1.0f, 1.0}, @@ -1553,7 +1515,7 @@ public void testInsertWithWindowPartitionByOrderBy(String contextName, Map context) + @Test + public void testReplaceWithWindowsAndUnnest() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1579,7 +1540,7 @@ public void testReplaceWithWindowsAndUnnest(String contextName, Map context) + @Test + public void testSimpleWindowWithPartitionBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1612,7 +1572,7 @@ public void testSimpleWindowWithPartitionBy(String contextName, Map context) + @Test + public void testSimpleWindowWithEmptyOver() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1643,7 +1602,7 @@ public void testSimpleWindowWithEmptyOver(String contextName, Map context) + @Test + public void testSimpleWindowWithEmptyOverNoGroupBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1674,7 +1632,7 @@ public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, Map context) + @Test + public void testSimpleWindowWithDuplicateSelectNode() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1706,7 +1663,7 @@ public void testSimpleWindowWithDuplicateSelectNode(String contextName, Map context) + @Test + public void testSimpleWindowWithJoins() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1738,7 +1694,7 @@ public void testSimpleWindowWithJoins(String contextName, Map co + "PARTITIONED BY DAY CLUSTERED BY m1") .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) .setExpectedResultRows( ImmutableList.of( @@ -1764,9 +1720,8 @@ public void testSimpleWindowWithJoins(String contextName, Map co } // Bigger dataset tests - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipedia(String contextName, Map context) + @Test + public void testSelectWithWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1782,7 +1737,7 @@ public void testSelectWithWikipedia(String contextName, Map cont final Map contextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"}]" @@ -1800,7 +1755,7 @@ public void testSelectWithWikipedia(String contextName, Map cont .context(contextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("cityName", ColumnType.STRING) .add("added", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -1833,17 +1788,16 @@ public void testSelectWithWikipedia(String contextName, Map cont new Object[]{"Albuquerque", 9L, 140L}, new Object[]{"Albuquerque", 2L, 140L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName, Map context) + @Test + public void testSelectWithWikipediaEmptyOverWithCustomContext() { final Map customContext = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200) .build(); @@ -1855,9 +1809,8 @@ public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextName, Map context) + @Test + public void testSelectWithWikipediaWithPartitionKeyNotInSelect() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1873,7 +1826,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam final Map innerContextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"}]" @@ -1891,7 +1844,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam .context(innerContextWithRowSignature) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("cityName", ColumnType.STRING) .add("added", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -1905,7 +1858,7 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam final Map outerContextWithRowSignature = ImmutableMap.builder() - .putAll(context) + .putAll(DEFAULT_MSQ_CONTEXT) .put( DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"}]" @@ -1944,13 +1897,12 @@ public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextNam new Object[]{"Tokyo", 0L, 12615L}, new Object[]{"Santiago", 161L, 401L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testGroupByWithWikipedia(String contextName, Map context) + @Test + public void testGroupByWithWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("cityName", ColumnType.STRING) @@ -1975,7 +1927,7 @@ public void testGroupByWithWikipedia(String contextName, Map con ColumnType.LONG ) )) - .setContext(context) + .setContext(DEFAULT_MSQ_CONTEXT) .build(); @@ -1988,7 +1940,7 @@ public void testGroupByWithWikipedia(String contextName, Map con final WindowOperatorQuery query = new WindowOperatorQuery( new QueryDataSource(groupByQuery), new LegacySegmentSpec(Intervals.ETERNITY), - context, + DEFAULT_MSQ_CONTEXT, RowSignature.builder().add("d0", ColumnType.STRING) .add("d1", ColumnType.LONG) .add("w0", ColumnType.LONG).build(), @@ -2022,13 +1974,12 @@ public void testGroupByWithWikipedia(String contextName, Map con new Object[]{"Albuquerque", 9L, 140L}, new Object[]{"Albuquerque", 129L, 140L} )) - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testReplaceGroupByOnWikipedia(String contextName, Map context) + @Test + public void testReplaceGroupByOnWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -2044,7 +1995,7 @@ public void testReplaceGroupByOnWikipedia(String contextName, Map context) + @Test + public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers() { - final Map multipleWorkerContext = new HashMap<>(context); + final Map multipleWorkerContext = new HashMap<>(DEFAULT_MSQ_CONTEXT); multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5); final RowSignature rowSignature = RowSignature.builder() @@ -2289,9 +2239,8 @@ public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String cont .verifyResults(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map context) + @Test + public void testReplaceWithPartitionedByDayOnWikipedia() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -2307,7 +2256,7 @@ public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map context) + @Test + public void testFailurePartitionByMVD_1() { testSelectQuery() .setSql("select cityName, countryName, array_to_mv(array[1,length(cityName)]), " @@ -2337,7 +2285,7 @@ public void testFailurePartitionByMVD_1(String contextName, Map + "from wikipedia\n" + "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n" + "order by 1, 2, 3") - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( CoreMatchers.instanceOf(ISE.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( @@ -2346,9 +2294,8 @@ public void testFailurePartitionByMVD_1(String contextName, Map .verifyExecutionError(); } - @MethodSource("data") - @ParameterizedTest(name = "{index}:with context {0}") - public void testFailurePartitionByMVD_2(String contextName, Map context) + @Test + public void testFailurePartitionByMVD_2() { testSelectQuery() .setSql(" select cityName, countryName, array_to_mv(array[1,length(cityName)])," @@ -2357,7 +2304,7 @@ public void testFailurePartitionByMVD_2(String contextName, Map + "from wikipedia\n" + "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n" + "order by 1, 2, 3") - .setQueryContext(context) + .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( CoreMatchers.instanceOf(ISE.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(