From 55279a120215d59eaa2b47f95565af61b8e8bc97 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 19 Jul 2024 19:27:48 +0530 Subject: [PATCH 1/5] MSQ window functions: Fix partition boundary issues for arrays --- .../WindowOperatorQueryFrameProcessor.java | 5 +++-- .../druid/sql/calcite/DrillWindowQueryTest.java | 14 ++++++++++++++ .../partition_by_array/wikipedia_query_1.e | 13 +++++++++++++ .../partition_by_array/wikipedia_query_1.q | 6 ++++++ .../partition_by_array/wikipedia_query_2.e | 13 +++++++++++++ .../partition_by_array/wikipedia_query_2.q | 6 ++++++ 6 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index e21cae36d0f5..f4e3c1bd3640 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -59,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -499,7 +499,8 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List nullableTypeStrategy = frameReader.signature().getColumnType(columnName).get().getNullableStrategy(); + if (nullableTypeStrategy.compare(row1.get(i), row2.get(i)) == 0) { match++; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index 24076d1cdbf9..236a5d47485a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -7750,4 +7750,18 @@ public void test_empty_and_non_empty_over_wikipedia_query_3() { windowQueryTest(); } + + @DrillTest("druid_queries/partition_by_array/wikipedia_query_1") + @Test + public void test_partition_by_array_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/partition_by_array/wikipedia_query_2") + @Test + public void test_partition_by_array_wikipedia_query_2() + { + windowQueryTest(); + } } diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e new file mode 100644 index 000000000000..26c251a35fb5 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e @@ -0,0 +1,13 @@ +Austria null #de.wikipedia 1 +Republic of Korea null #en.wikipedia 2 +Republic of Korea null #ja.wikipedia 3 +Republic of Korea null #ko.wikipedia 4 +Republic of Korea Seoul #ko.wikipedia 1 +Austria Vienna #de.wikipedia 1 +Austria Vienna #es.wikipedia 2 +Austria Vienna #tr.wikipedia 3 +Republic of Korea Jeonju #ko.wikipedia 4 +Republic of Korea Suwon-si #ko.wikipedia 1 +Austria Horsching #de.wikipedia 1 +Republic of Korea Seongnam-si #ko.wikipedia 1 +Republic of Korea Yongsan-dong #ko.wikipedia 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q new file mode 100644 index 000000000000..b10b52af389f --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q @@ -0,0 +1,6 @@ +select +countryName, cityName, channel, +row_number() over (partition by array[1,2,length(cityName)] order by countryName) as c +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e new file mode 100644 index 000000000000..a1b116035c18 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e @@ -0,0 +1,13 @@ +Austria null #de.wikipedia 1 +Austria Horsching #de.wikipedia 2 +Austria Vienna #de.wikipedia 3 +Austria Vienna #es.wikipedia 4 +Austria Vienna #tr.wikipedia 5 +Republic of Korea null #en.wikipedia 6 +Republic of Korea null #ja.wikipedia 7 +Republic of Korea null #ko.wikipedia 8 +Republic of Korea Jeonju #ko.wikipedia 9 +Republic of Korea Seongnam-si #ko.wikipedia 10 +Republic of Korea Seoul #ko.wikipedia 11 +Republic of Korea Suwon-si #ko.wikipedia 12 +Republic of Korea Yongsan-dong #ko.wikipedia 13 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q new file mode 100644 index 000000000000..99245d7f9530 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q @@ -0,0 +1,6 @@ +select +countryName, cityName, channel, +row_number() over (partition by array[1,2,3] order by countryName) as c +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel From 6fcc80285089fefe1180267890edfbdd319d1d6f Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 23 Jul 2024 15:02:57 +0530 Subject: [PATCH 2/5] Address review comments --- .../druid/sql/calcite/DrillWindowQueryTest.java | 7 +++++++ .../partition_by_array/wikipedia_query_3.e | 13 +++++++++++++ .../partition_by_array/wikipedia_query_3.q | 6 ++++++ 3 files changed, 26 insertions(+) create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index 236a5d47485a..5e46c07687a2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -7764,4 +7764,11 @@ public void test_partition_by_array_wikipedia_query_2() { windowQueryTest(); } + + @DrillTest("druid_queries/partition_by_array/wikipedia_query_3") + @Test + public void test_partition_by_array_wikipedia_query_3() + { + windowQueryTest(); + } } diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e new file mode 100644 index 000000000000..ebd91f9f8933 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e @@ -0,0 +1,13 @@ +Austria null #de.wikipedia 1 +Austria Vienna #de.wikipedia 1 +Austria Vienna #es.wikipedia 2 +Austria Vienna #tr.wikipedia 3 +Austria Horsching #de.wikipedia 1 +Republic of Korea null #en.wikipedia 1 +Republic of Korea null #ja.wikipedia 2 +Republic of Korea null #ko.wikipedia 3 +Republic of Korea Seoul #ko.wikipedia 1 +Republic of Korea Jeonju #ko.wikipedia 1 +Republic of Korea Suwon-si #ko.wikipedia 1 +Republic of Korea Seongnam-si #ko.wikipedia 1 +Republic of Korea Yongsan-dong #ko.wikipedia 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q new file mode 100644 index 000000000000..9241f2ee94ef --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q @@ -0,0 +1,6 @@ +select +countryName, cityName, channel, +row_number() over (partition by array[1,length(countryName),length(cityName)] order by countryName) as c +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel From d602f5061df5444ae06282b78e4e77ae722dcfb4 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 23 Jul 2024 15:48:48 +0530 Subject: [PATCH 3/5] Cache type strategies --- .../querykit/WindowOperatorQueryFrameProcessor.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index f4e3c1bd3640..fd98e5f6a892 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private ResultRow outputRow = null; private FrameWriter frameWriter = null; + // List of type strategies to compare the partition columns across rows. + // Type strategies are pushed in the same order as column types in frameReader.signature() + private final List> typeStrategies = new ArrayList<>(); + public WindowOperatorQueryFrameProcessor( WindowOperatorQuery query, ReadableFrameChannel inputChannel, @@ -103,13 +107,17 @@ public WindowOperatorQueryFrameProcessor( this.frameWriterFactory = frameWriterFactory; this.operatorFactoryList = operatorFactoryList; this.jsonMapper = jsonMapper; - this.frameReader = frameReader; this.query = query; this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); this.objectsOfASingleRac = new ArrayList<>(); this.maxRowsMaterialized = maxRowsMaterializedInWindow; this.partitionColumnNames = partitionColumnNames; + + this.frameReader = frameReader; + for (int i = 0; i < frameReader.signature().size(); i++) { + typeStrategies.add(frameReader.signature().getColumnType(i).get().getNullableStrategy()); + } } @Override @@ -499,8 +507,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List nullableTypeStrategy = frameReader.signature().getColumnType(columnName).get().getNullableStrategy(); - if (nullableTypeStrategy.compare(row1.get(i), row2.get(i)) == 0) { + if (typeStrategies.get(i).compare(row1.get(i), row2.get(i)) == 0) { match++; } } From ff913463ae41887df5c167a96e78bf3008080fdd Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 24 Jul 2024 10:30:54 +0530 Subject: [PATCH 4/5] Trigger Build From 722e25ba02fedb9a49adeb5a65efbf8072c1d277 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 24 Jul 2024 13:35:49 +0530 Subject: [PATCH 5/5] Convert typeStrategies from list to array --- .../msq/querykit/WindowOperatorQueryFrameProcessor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index fd98e5f6a892..5fbfd3119d03 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -87,7 +87,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor // List of type strategies to compare the partition columns across rows. // Type strategies are pushed in the same order as column types in frameReader.signature() - private final List> typeStrategies = new ArrayList<>(); + private final NullableTypeStrategy[] typeStrategies; public WindowOperatorQueryFrameProcessor( WindowOperatorQuery query, @@ -115,8 +115,9 @@ public WindowOperatorQueryFrameProcessor( this.partitionColumnNames = partitionColumnNames; this.frameReader = frameReader; + this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()]; for (int i = 0; i < frameReader.signature().size(); i++) { - typeStrategies.add(frameReader.signature().getColumnType(i).get().getNullableStrategy()); + typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy(); } } @@ -507,7 +508,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List