From 31f61f3953df5c44019389e7048ca9cb3d1f3b86 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 18 Mar 2024 12:25:38 -0700 Subject: [PATCH 1/3] Fixing corner case when only one of upper or lower range is unbounded in a window range --- .../DefaultFramedOnHeapAggregatable.java | 18 ++++++++++++++++++ .../window/window_unbounded_lower.sqlTest | 16 ++++++++++++++++ .../window/window_unbounded_upper.sqlTest | 16 ++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 sql/src/test/resources/calcite/tests/window/window_unbounded_lower.sqlTest create mode 100644 sql/src/test/resources/calcite/tests/window/window_unbounded_upper.sqlTest diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 8b34b62f06f0..622c0cc88273 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -454,6 +454,15 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] rowIdProvider.incrementAndGet(); } + // Corner case when the numRows=1 and upperOffset=1 + // In such a case the priming of results is not needed + if (rowIdProvider.get() == numRows) { + for (int i = 0; i < aggs.length; i++) { + results[i][resultStorageIndex] = aggs[i].get(); + } + ++resultStorageIndex; + } + // Prime the results if (rowIdProvider.get() < numRows) { for (int i = 0; i < aggs.length; i++) { @@ -541,6 +550,15 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa rowIdProvider.decrementAndGet(); } + // Corner case when the numRows=1 and lowerOffset=1 + // In such a case the priming of results is not needed + if (rowIdProvider.get() < 0) { + for (int i = 0; i < aggs.length; i++) { + results[i][resultStorageIndex] = aggs[i].get(); + } + --resultStorageIndex; + } + // Prime the results if (rowIdProvider.get() >= 0) { for (int i = 0; i < aggs.length; i++) { diff --git a/sql/src/test/resources/calcite/tests/window/window_unbounded_lower.sqlTest b/sql/src/test/resources/calcite/tests/window/window_unbounded_lower.sqlTest new file mode 100644 index 000000000000..3b9ea3bb6ac3 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/window_unbounded_lower.sqlTest @@ -0,0 +1,16 @@ +type: "operatorValidation" + +sql: | + select countryName, cityName, + count(cityName) over w cnt + from wikipedia + where countryName in ('Austria', 'Republic of Korea') and (cityName in ('Horsching', 'Vienna', 'Seoul', 'Jeonju')) + group by countryName, cityName + window w as (partition by cityName order by countryName asc rows between unbounded preceding and 1 following) + + +expectedResults: + - ["Austria","Horsching",1] + - ["Republic of Korea","Jeonju",1] + - ["Republic of Korea","Seoul",1] + - ["Austria","Vienna",1] \ No newline at end of file diff --git a/sql/src/test/resources/calcite/tests/window/window_unbounded_upper.sqlTest b/sql/src/test/resources/calcite/tests/window/window_unbounded_upper.sqlTest new file mode 100644 index 000000000000..0da244578af8 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/window_unbounded_upper.sqlTest @@ -0,0 +1,16 @@ +type: "operatorValidation" + +sql: | + select countryName, cityName, + count(cityName) over w cnt + from wikipedia + where countryName in ('Austria', 'Republic of Korea') and (cityName in ('Horsching', 'Vienna', 'Seoul', 'Jeonju')) + group by countryName, cityName + window w as (partition by cityName order by countryName asc rows between 1 preceding and unbounded following) + + +expectedResults: + - ["Austria","Horsching",1] + - ["Republic of Korea","Jeonju",1] + - ["Republic of Korea","Seoul",1] + - ["Austria","Vienna",1] \ No newline at end of file From 145b4042c3d69785cca147be30fd2537926e0f7c Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 8 Apr 2024 12:47:13 -0700 Subject: [PATCH 2/3] fixing a comment --- .../rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 622c0cc88273..1dc1c984bc85 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -454,7 +454,7 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] rowIdProvider.incrementAndGet(); } - // Corner case when the numRows=1 and upperOffset=1 + // Corner case when the numRows=upperOffset // In such a case the priming of results is not needed if (rowIdProvider.get() == numRows) { for (int i = 0; i < aggs.length; i++) { @@ -550,7 +550,7 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa rowIdProvider.decrementAndGet(); } - // Corner case when the numRows=1 and lowerOffset=1 + // Corner case when the numRows=loweOffset // In such a case the priming of results is not needed if (rowIdProvider.get() < 0) { for (int i = 0; i < aggs.length; i++) { From 1d89a8f58611527c0a4b93e898213b1cefffc78a Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 Apr 2024 12:28:32 -0700 Subject: [PATCH 3/3] Improve coverage --- .../FramedOnHeapAggregatableTest.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java index d00e12b3b660..f26a517f9a50 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java @@ -392,6 +392,8 @@ public void testUnboundedWindowedAggregation() .validate(results); } + + @Test public void testCumulativeAggregation() { @@ -460,6 +462,73 @@ public void testReverseCumulativeAggregation() .validate(results); } + @Test + public void testCumulativeAggregationUnbounded() + { + Map map = new LinkedHashMap<>(); + map.put("intCol", new IntArrayColumn(new int[]{0})); + map.put("doubleCol", new DoubleArrayColumn(new double[]{0})); + map.put("objectCol", new ObjectArrayColumn( + new String[]{"a"}, + ColumnType.STRING + ) + ); + + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map)); + + FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); + + final RowsAndColumns results = agger.aggregateAll( + new WindowFrame(WindowFrame.PeerType.ROWS, true, 1, false, 1, null), + new AggregatorFactory[]{ + new LongMaxAggregatorFactory("cummMax", "intCol"), + new DoubleSumAggregatorFactory("cummSum", "doubleCol") + } + ); + + new RowsAndColumnsHelper() + .expectColumn("intCol", new int[]{0}) + .expectColumn("doubleCol", new double[]{0}) + .expectColumn("objectCol", new String[]{"a"}, ColumnType.STRING) + .expectColumn("cummMax", new long[]{0}) + .expectColumn("cummSum", new double[]{0}) + .allColumnsRegistered() + .validate(results); + } + + @Test + public void testReverseCumulativeAggregationUnbound() + { + Map map = new LinkedHashMap<>(); + map.put("intCol", new IntArrayColumn(new int[]{0})); + map.put("doubleCol", new DoubleArrayColumn(new double[]{0})); + map.put("objectCol", new ObjectArrayColumn( + new String[]{"a"}, + ColumnType.STRING + ) + ); + + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map)); + + FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); + + final RowsAndColumns results = agger.aggregateAll( + new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, true, 0, null), + new AggregatorFactory[]{ + new LongMaxAggregatorFactory("cummMax", "intCol"), + new DoubleSumAggregatorFactory("cummSum", "doubleCol") + } + ); + + new RowsAndColumnsHelper() + .expectColumn("intCol", new int[]{0}) + .expectColumn("doubleCol", new double[]{0}) + .expectColumn("objectCol", new String[]{"a"}, ColumnType.STRING) + .expectColumn("cummMax", new long[]{0}) + .expectColumn("cummSum", new double[]{0}) + .allColumnsRegistered() + .validate(results); + } @Test