From 1904ae4e652aefdfe46ef2d11705eea206450835 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 17 Apr 2024 19:38:13 +0530 Subject: [PATCH 1/5] fix mapping in window project --- .../druid/sql/calcite/rel/Windowing.java | 11 ++++++++-- .../sql/calcite/CalciteWindowQueryTest.java | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 0a4f3226d7e3..93ca218cc36a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.stream.Collectors; /** * Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}. @@ -243,8 +244,14 @@ public static Windowing fromCalciteStuff( // We know windowProject is a mapping due to the isMapping() check in DruidRules. Check for null anyway, // as defensive programming. final Mappings.TargetMapping mapping = Preconditions.checkNotNull( - partialQuery.getWindowProject().getMapping(), - "mapping for windowProject[%s]", partialQuery.getWindowProject() + partialQuery.getWindowProject().getProjects().stream().map(RexNode::toString).collect(Collectors.toSet()).size() + < partialQuery.getWindowProject().getProjects().size() + ? Project.getPartialMapping( + partialQuery.getWindowProject().getRowType().getFieldCount(), + partialQuery.getWindowProject().getProjects() + ) + : partialQuery.getWindowProject().getMapping(), + "mapping for windowProject[%s]", partialQuery.getWindowProject() ); final List windowProjectOutputColumns = new ArrayList<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index c869cb8e44fa..16706335515b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; @@ -37,6 +38,7 @@ import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -231,6 +233,25 @@ public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) th } } + @Test + public void testWindow() + { + testBuilder() + .sql("SELECT\n" + + "(rank() over (order by count(*) desc)),\n" + + "(rank() over (order by count(*) desc))\n" + + "FROM \"wikipedia\"") + .queryContext(ImmutableMap.of( + PlannerContext.CTX_ENABLE_WINDOW_FNS, true, + QueryContexts.ENABLE_DEBUG, true, + QueryContexts.WINDOWING_STRICT_VALIDATION, false + )) + .expectedResults(ImmutableList.of( + new Object[]{1L, 1L} + )) + .run(); + } + private WindowOperatorQuery getWindowOperatorQuery(List> queries) { assertEquals(1, queries.size()); From 94243f64e14550892478f70352d1e51d0e8fed2f Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 17 Apr 2024 20:48:30 +0530 Subject: [PATCH 2/5] drill test fixes --- .../org/apache/druid/sql/calcite/DrillWindowQueryTest.java | 3 --- .../java/org/apache/druid/sql/calcite/NotYetSupported.java | 1 - 2 files changed, 4 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index 2236a7d71a80..59f7de2ad177 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -4698,7 +4698,6 @@ public void test_aggregates_winFnQry_21() windowQueryTest(); } - @NotYetSupported(Modes.NPE) @DrillTest("first_val/firstValFn_5") @Test public void test_first_val_firstValFn_5() @@ -4922,7 +4921,6 @@ public void test_frameclause_subQueries_frmInSubQry_46() windowQueryTest(); } - @NotYetSupported(Modes.NPE) @DrillTest("lag_func/lag_Fn_82") @Test public void test_lag_func_lag_Fn_82() @@ -4930,7 +4928,6 @@ public void test_lag_func_lag_Fn_82() windowQueryTest(); } - @NotYetSupported(Modes.NPE) @DrillTest("last_val/lastValFn_5") @Test public void test_last_val_lastValFn_5() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index 43f2faa3f0ce..de94a2649766 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -83,7 +83,6 @@ enum Modes COLUMN_NOT_FOUND(DruidException.class, "CalciteContextException.*Column.*not found in any table"), NULLS_FIRST_LAST(DruidException.class, "NULLS (FIRST|LAST)"), BIGINT_TO_DATE(DruidException.class, "BIGINT to type (DATE|TIME)"), - NPE(DruidException.class, "java.lang.NullPointerException"), AGGREGATION_NOT_SUPPORT_TYPE(DruidException.class, "Aggregation \\[(MIN|MAX)\\] does not support type \\[STRING\\]"), ALLDATA_CSV(DruidException.class, "allData.csv"), BIGINT_TIME_COMPARE(DruidException.class, "Cannot apply '.' to arguments of type"), From 87f898526e314a342f5b43d8601c6d72b0052f29 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sat, 20 Apr 2024 07:32:59 +0530 Subject: [PATCH 3/5] use partial mapping always - can have even 0 or >1 targets --- .../apache/druid/msq/exec/MSQWindowTest.java | 32 +++++++++++++++++++ .../druid/sql/calcite/rel/Windowing.java | 23 +++++++------ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 74b04138a741..1ffa89ab2471 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -1724,6 +1724,38 @@ public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, Map context) + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("m1", ColumnType.FLOAT) + .add("cc", ColumnType.DOUBLE) + .add("cc_dup", ColumnType.DOUBLE) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL\n" + + "select __time, m1,SUM(m1) OVER() cc,SUM(m1) OVER() cc_dup from foo\n" + + "PARTITIONED BY ALL CLUSTERED BY m1") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, 1.0f, 21.0, 21.0}, + new Object[]{946771200000L, 2.0f, 21.0, 21.0}, + new Object[]{946857600000L, 3.0f, 21.0, 21.0}, + new Object[]{978307200000L, 4.0f, 21.0, 21.0}, + new Object[]{978393600000L, 5.0f, 21.0, 21.0}, + new Object[]{978480000000L, 6.0f, 21.0, 21.0} + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testSimpleWindowWithJoins(String contextName, Map context) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 93ca218cc36a..b08d14eb655c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.calcite.linq4j.Ord; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelFieldCollation; @@ -34,6 +35,7 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -241,17 +243,18 @@ public static Windowing fromCalciteStuff( // Apply windowProject, if present. if (partialQuery.getWindowProject() != null) { - // We know windowProject is a mapping due to the isMapping() check in DruidRules. Check for null anyway, - // as defensive programming. + // We know windowProject is a mapping due to the isMapping() check in DruidRules. + // check anyway as defensive programming. + Preconditions.checkArgument(partialQuery.getWindowProject().isMapping()); + int maxFieldCount = 1 + partialQuery.getWindowProject() + .getProjects() + .stream() + .map(node -> ((RexInputRef) node).getIndex()) + .reduce(0, Integer::max); final Mappings.TargetMapping mapping = Preconditions.checkNotNull( - partialQuery.getWindowProject().getProjects().stream().map(RexNode::toString).collect(Collectors.toSet()).size() - < partialQuery.getWindowProject().getProjects().size() - ? Project.getPartialMapping( - partialQuery.getWindowProject().getRowType().getFieldCount(), - partialQuery.getWindowProject().getProjects() - ) - : partialQuery.getWindowProject().getMapping(), - "mapping for windowProject[%s]", partialQuery.getWindowProject() + Project.getPartialMapping(maxFieldCount, partialQuery.getWindowProject().getProjects()), + "mapping for windowProject[%s]", + partialQuery.getWindowProject() ); final List windowProjectOutputColumns = new ArrayList<>(); From c80eb59b2c5f99131d6dda696eb13ba503bd3aa2 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sat, 20 Apr 2024 07:35:15 +0530 Subject: [PATCH 4/5] checkstyle --- .../main/java/org/apache/druid/sql/calcite/rel/Windowing.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index b08d14eb655c..08786808a342 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import org.apache.calcite.linq4j.Ord; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelFieldCollation; @@ -35,7 +34,6 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexWindowBound; -import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -73,7 +71,6 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.stream.Collectors; /** * Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}. From 5241793098d11dd5ab69d2c07e404446c9c7ade1 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 23 Apr 2024 16:21:56 +0530 Subject: [PATCH 5/5] partial mapping --- .../org/apache/druid/sql/calcite/rel/Windowing.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 08786808a342..20c672ce924b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -243,13 +243,11 @@ public static Windowing fromCalciteStuff( // We know windowProject is a mapping due to the isMapping() check in DruidRules. // check anyway as defensive programming. Preconditions.checkArgument(partialQuery.getWindowProject().isMapping()); - int maxFieldCount = 1 + partialQuery.getWindowProject() - .getProjects() - .stream() - .map(node -> ((RexInputRef) node).getIndex()) - .reduce(0, Integer::max); final Mappings.TargetMapping mapping = Preconditions.checkNotNull( - Project.getPartialMapping(maxFieldCount, partialQuery.getWindowProject().getProjects()), + Project.getPartialMapping( + partialQuery.getWindowProject().getInput().getRowType().getFieldCount(), + partialQuery.getWindowProject().getProjects() + ), "mapping for windowProject[%s]", partialQuery.getWindowProject() );