From 1a62cef562e0b7b9049ae68fd206ceb41cff58e2 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 17 Sep 2024 10:12:05 +0530 Subject: [PATCH 1/5] try out --- .../query/SqlWindowFunctionsBenchmark.java | 16 ++++- .../columnar/StringFrameColumnReader.java | 69 ++++++++++++++++++- .../semantic/DefaultNaiveSortMaker.java | 8 ++- 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java index 7fa2d38d6868..98ab1862c6ec 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java @@ -157,7 +157,7 @@ public class SqlWindowFunctionsBenchmark @Override public int getNumMergeBuffers() { - return 3; + return 8; } }; @@ -336,7 +336,8 @@ public void querySql(String sql, Blackhole blackhole) { final Map context = ImmutableMap.of( PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto" + QueryContexts.MAX_SUBQUERY_BYTES_KEY, "disabled", + QueryContexts.MAX_SUBQUERY_ROWS_KEY, -1 ); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, context)) { final PlannerResult plannerResult = planner.plan(); @@ -420,4 +421,15 @@ public void windowWithoutSorter(Blackhole blackhole) + "GROUP BY dimUniform, dimSequential"; querySql(sql, blackhole); } + + @Benchmark + public void windowWithGroupbyTime(Blackhole blackhole) + { + String sql = "SELECT " + + "SUM(dimSequentialHalfNull) + SUM(dimHyperUnique), " + + "LAG(SUM(dimSequentialHalfNull + dimHyperUnique)) OVER (PARTITION BY dimUniform ORDER BY dimSequential) " + + "FROM foo " + + "GROUP BY __time, dimUniform, dimSequential"; + querySql(sql, blackhole); + } } diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index 2385c431e5b3..d0fccd28b222 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -191,7 +191,7 @@ private static long getStartOfStringDataSection( return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues; } - private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn + public static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn { private final Frame frame; private final Memory memory; @@ -619,5 +619,72 @@ private List getRowAsListUtf8(final int physicalRow) return Collections.singletonList((ByteBuffer) object); } } + + public int compare(int rowNum1, int rowNum2) { + int index1 = frame.physicalRow(rowNum1); + int index2 = frame.physicalRow(rowNum2); + + final long dataStart1; + final long dataEnd1 = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index1); + + if (index1 == 0) { + dataStart1 = startOfStringDataSection; + } else { + dataStart1 = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index1 - 1)); + } + + int dataLength = Ints.checkedCast(dataEnd1 - dataStart1); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart1) == FrameWriterUtils.NULL_STRING_MARKER)) { + return -1; + } + + + final long dataStart2; + final long dataEnd2 = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index2); + + if (index2 == 0) { + dataStart2 = startOfStringDataSection; + } else { + dataStart2 = + startOfStringDataSection + + memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index2 - 1)); + } + + dataLength = Ints.checkedCast(dataEnd2 - dataStart2); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart2) == FrameWriterUtils.NULL_STRING_MARKER)) { + return 1; + } + + final byte[] stringData1 = new byte[(int) (dataEnd1 - dataStart1)]; + memory.getByteArray(dataStart1, stringData1, 0, (int) (dataEnd1 - dataStart1)); + + final byte[] stringData2 = new byte[(int) (dataEnd1 - dataStart1)]; + memory.getByteArray(dataStart1, stringData2, 0, (int) (dataEnd1 - dataStart1)); + + int length1 = stringData1.length; + int length2 = stringData2.length; + int minLength = Math.min(length1, length2); + + // Compare element by element + for (int i = 0; i < minLength; i++) { + int diff = Byte.compare(stringData1[i], stringData2[i]); + if (diff != 0) { + return diff; // Return the difference when mismatch occurs + } + } + + // If all elements up to minLength are equal, the shorter array is "less" + return Integer.compare(length1, length2); + } } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java index 6ce7c258ef17..9f4390e02416 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java @@ -20,6 +20,7 @@ package org.apache.druid.query.rowsandcols.semantic; import it.unimi.dsi.fastutil.Arrays; +import org.apache.druid.frame.read.columnar.StringFrameColumnReader; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.EmptyRowsAndColumns; @@ -112,7 +113,12 @@ public RowsAndColumns complete() (k1, k2) -> { for (int i = 0; i < numColsToCompare; ++i) { final ColumnAccessor accessy = accessors[i]; - int val = accessy.compareRows(sortedPointers[k1], sortedPointers[k2]); + int val; + if (accessy instanceof StringFrameColumnReader.StringFrameColumn) { + val = ((StringFrameColumnReader.StringFrameColumn) accessy).compare(sortedPointers[k1], sortedPointers[k2]); + } else { + val = accessy.compareRows(sortedPointers[k1], sortedPointers[k2]); + } if (val != 0) { return val * direction[i]; } From acfea8b371caddb417208c045463f91decc553c4 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 17 Sep 2024 21:24:23 +0530 Subject: [PATCH 2/5] compare Strings without conversion --- .../query/SqlWindowFunctionsBenchmark.java | 12 ++- .../columnar/StringFrameColumnReader.java | 83 ++++--------------- .../semantic/DefaultNaiveSortMaker.java | 8 +- 3 files changed, 25 insertions(+), 78 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java index 98ab1862c6ec..0177f14ef21e 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java @@ -157,7 +157,13 @@ public class SqlWindowFunctionsBenchmark @Override public int getNumMergeBuffers() { - return 8; + return 3; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 200_000_000; } }; @@ -336,8 +342,8 @@ public void querySql(String sql, Blackhole blackhole) { final Map context = ImmutableMap.of( PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.MAX_SUBQUERY_BYTES_KEY, "disabled", - QueryContexts.MAX_SUBQUERY_ROWS_KEY, -1 + QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto"//, + //QueryContexts.MAX_SUBQUERY_ROWS_KEY, -1 ); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, context)) { final PlannerResult plannerResult = planner.plan(); diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index d0fccd28b222..2b2526c2d59f 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -191,7 +191,7 @@ private static long getStartOfStringDataSection( return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues; } - public static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn + private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn { private final Frame frame; private final Memory memory; @@ -507,6 +507,20 @@ protected Comparator getComparator() return Comparator.nullsFirst(Comparator.comparing(o -> ((String) o))); } + @Override + public int compareRows(int rowNum1, int rowNum2) + { + ByteBuffer buffer1 = getStringUtf8(rowNum1); + ByteBuffer buffer2 = getStringUtf8(rowNum2); + + if (buffer1 == null) { + return -1; + } else if (buffer2 == null) { + return 1; + } + return buffer1.compareTo(buffer2); + } + /** * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is @@ -619,72 +633,5 @@ private List getRowAsListUtf8(final int physicalRow) return Collections.singletonList((ByteBuffer) object); } } - - public int compare(int rowNum1, int rowNum2) { - int index1 = frame.physicalRow(rowNum1); - int index2 = frame.physicalRow(rowNum2); - - final long dataStart1; - final long dataEnd1 = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index1); - - if (index1 == 0) { - dataStart1 = startOfStringDataSection; - } else { - dataStart1 = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index1 - 1)); - } - - int dataLength = Ints.checkedCast(dataEnd1 - dataStart1); - - if ((dataLength == 0 && NullHandling.replaceWithDefault()) || - (dataLength == 1 && memory.getByte(dataStart1) == FrameWriterUtils.NULL_STRING_MARKER)) { - return -1; - } - - - final long dataStart2; - final long dataEnd2 = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index2); - - if (index2 == 0) { - dataStart2 = startOfStringDataSection; - } else { - dataStart2 = - startOfStringDataSection + - memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index2 - 1)); - } - - dataLength = Ints.checkedCast(dataEnd2 - dataStart2); - - if ((dataLength == 0 && NullHandling.replaceWithDefault()) || - (dataLength == 1 && memory.getByte(dataStart2) == FrameWriterUtils.NULL_STRING_MARKER)) { - return 1; - } - - final byte[] stringData1 = new byte[(int) (dataEnd1 - dataStart1)]; - memory.getByteArray(dataStart1, stringData1, 0, (int) (dataEnd1 - dataStart1)); - - final byte[] stringData2 = new byte[(int) (dataEnd1 - dataStart1)]; - memory.getByteArray(dataStart1, stringData2, 0, (int) (dataEnd1 - dataStart1)); - - int length1 = stringData1.length; - int length2 = stringData2.length; - int minLength = Math.min(length1, length2); - - // Compare element by element - for (int i = 0; i < minLength; i++) { - int diff = Byte.compare(stringData1[i], stringData2[i]); - if (diff != 0) { - return diff; // Return the difference when mismatch occurs - } - } - - // If all elements up to minLength are equal, the shorter array is "less" - return Integer.compare(length1, length2); - } } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java index 9f4390e02416..6ce7c258ef17 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java @@ -20,7 +20,6 @@ package org.apache.druid.query.rowsandcols.semantic; import it.unimi.dsi.fastutil.Arrays; -import org.apache.druid.frame.read.columnar.StringFrameColumnReader; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.EmptyRowsAndColumns; @@ -113,12 +112,7 @@ public RowsAndColumns complete() (k1, k2) -> { for (int i = 0; i < numColsToCompare; ++i) { final ColumnAccessor accessy = accessors[i]; - int val; - if (accessy instanceof StringFrameColumnReader.StringFrameColumn) { - val = ((StringFrameColumnReader.StringFrameColumn) accessy).compare(sortedPointers[k1], sortedPointers[k2]); - } else { - val = accessy.compareRows(sortedPointers[k1], sortedPointers[k2]); - } + int val = accessy.compareRows(sortedPointers[k1], sortedPointers[k2]); if (val != 0) { return val * direction[i]; } From 0045843d89f0191e56690510a4c1b3470237ebf6 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 17 Sep 2024 21:43:45 +0530 Subject: [PATCH 3/5] disable subqueryBytes --- .../druid/benchmark/query/SqlWindowFunctionsBenchmark.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java index 0177f14ef21e..977e8e8e42c4 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlWindowFunctionsBenchmark.java @@ -342,8 +342,8 @@ public void querySql(String sql, Blackhole blackhole) { final Map context = ImmutableMap.of( PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.MAX_SUBQUERY_BYTES_KEY, "auto"//, - //QueryContexts.MAX_SUBQUERY_ROWS_KEY, -1 + QueryContexts.MAX_SUBQUERY_BYTES_KEY, "disabled", + QueryContexts.MAX_SUBQUERY_ROWS_KEY, -1 ); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, context)) { final PlannerResult plannerResult = planner.plan(); From 3c6b8d17770853ae85dbd9bd97bd18085e046e99 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 18 Sep 2024 12:05:14 +0530 Subject: [PATCH 4/5] fix --- .../druid/frame/read/columnar/StringFrameColumnReader.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index 2b2526c2d59f..c58c6e4419f6 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -513,7 +513,9 @@ public int compareRows(int rowNum1, int rowNum2) ByteBuffer buffer1 = getStringUtf8(rowNum1); ByteBuffer buffer2 = getStringUtf8(rowNum2); - if (buffer1 == null) { + if (buffer1 == null && buffer2 == null) { + return 0; + } else if (buffer1 == null) { return -1; } else if (buffer2 == null) { return 1; From 406d3a765754adc0c158b2b57b4c2ee11a1c3273 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 18 Sep 2024 12:26:05 +0530 Subject: [PATCH 5/5] simplify --- .../read/columnar/StringFrameColumnReader.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index c58c6e4419f6..1bd008d5378b 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -20,6 +20,7 @@ package org.apache.druid.frame.read.columnar; import com.google.common.primitives.Ints; +import org.apache.commons.lang.ObjectUtils; import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; @@ -510,17 +511,7 @@ protected Comparator getComparator() @Override public int compareRows(int rowNum1, int rowNum2) { - ByteBuffer buffer1 = getStringUtf8(rowNum1); - ByteBuffer buffer2 = getStringUtf8(rowNum2); - - if (buffer1 == null && buffer2 == null) { - return 0; - } else if (buffer1 == null) { - return -1; - } else if (buffer2 == null) { - return 1; - } - return buffer1.compareTo(buffer2); + return ObjectUtils.compare(getStringUtf8(rowNum1), getStringUtf8(rowNum2)); } /**