From 838356b906e409749a962655f37a9e321a73cffb Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 3 Oct 2024 10:39:22 +0530 Subject: [PATCH 1/2] WindowOperatorQueryFrameProcessor: Fix frame writer capacity issues + adhere to FrameProcessor's contract (#17209) This PR fixes the above issue by maintaining the state of last rowId flushed to output channel, and triggering another iteration of runIncrementally() method if frame writer has rows pending flush to the output channel. The above is done keeping in mind FrameProcessor's contract which enforces that we should write only a single frame to each output channel in any given iteration of runIncrementally(). --- .../WindowOperatorQueryFrameProcessor.java | 118 ++++++++++++------ ...WindowOperatorQueryFrameProcessorTest.java | 106 ++++++++++++++-- .../msq2.iq | 54 ++++++++ 3 files changed, 228 insertions(+), 50 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq2.iq diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 3dc62f3a60de..7289fb6bb113 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -81,7 +81,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final FrameWriterFactory frameWriterFactory; private final FrameReader frameReader; private final int maxRowsMaterialized; - private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private Cursor frameCursor = null; private Supplier rowSupplierFromFrameCursor; private ResultRow outputRow = null; @@ -97,6 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final ArrayList rowsToProcess; private int lastPartitionIndex = -1; + final AtomicInteger rowId = new AtomicInteger(0); + public WindowOperatorQueryFrameProcessor( WindowOperatorQuery query, ReadableFrameChannel inputChannel, @@ -152,7 +153,7 @@ public List outputChannels() } @Override - public ReturnOrAwait runIncrementally(IntSet readableInputs) + public ReturnOrAwait runIncrementally(IntSet readableInputs) throws IOException { /* There are 2 scenarios: @@ -213,32 +214,54 @@ Current approach with R&C and operators materialize a single R&C for processing. Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data. */ + // If there are rows pending flush, flush them and run again before processing any more rows. + if (frameHasRowsPendingFlush()) { + flushAllRowsAndCols(); + return ReturnOrAwait.runAgain(); + } + if (partitionColumnNames.isEmpty()) { // Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY. if (inputChannel.canRead()) { final Frame frame = inputChannel.read(); convertRowFrameToRowsAndColumns(frame); return ReturnOrAwait.runAgain(); - } else if (inputChannel.isFinished()) { - runAllOpsOnMultipleRac(frameRowsAndCols); + } + + if (inputChannel.isFinished()) { + // If no rows are flushed yet, process all rows. + if (rowId.get() == 0) { + runAllOpsOnMultipleRac(frameRowsAndCols); + } + + // If there are still rows pending after operations, run again. + if (frameHasRowsPendingFlush()) { + return ReturnOrAwait.runAgain(); + } return ReturnOrAwait.returnObject(Unit.instance()); - } else { - return ReturnOrAwait.awaitAll(inputChannels().size()); } + return ReturnOrAwait.awaitAll(inputChannels().size()); } // Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY if (frameCursor == null || frameCursor.isDone()) { if (readableInputs.isEmpty()) { return ReturnOrAwait.awaitAll(1); - } else if (inputChannel.canRead()) { + } + + if (inputChannel.canRead()) { final Frame frame = inputChannel.read(); frameCursor = FrameProcessors.makeCursor(frame, frameReader); makeRowSupplierFromFrameCursor(); } else if (inputChannel.isFinished()) { - // Handle any remaining data. - lastPartitionIndex = rowsToProcess.size() - 1; - processRowsUpToLastPartition(); + // If we have some rows pending processing, process them. + // We run it again as it's possible that frame writer's capacity got reached and some output rows are + // pending flush to the output channel. + if (!rowsToProcess.isEmpty()) { + lastPartitionIndex = rowsToProcess.size() - 1; + processRowsUpToLastPartition(); + return ReturnOrAwait.runAgain(); + } return ReturnOrAwait.returnObject(Unit.instance()); } else { return ReturnOrAwait.runAgain(); @@ -310,41 +333,30 @@ public Operator.Signal push(RowsAndColumns rac) public void completed() { try { - // resultRowsAndCols has reference to frameRowsAndCols - // due to the chain of calls across the ops - // so we can clear after writing to output - flushAllRowsAndCols(resultRowAndCols); - frameRowsAndCols.clear(); - + flushAllRowsAndCols(); } catch (IOException e) { throw new RuntimeException(e); } - finally { - frameRowsAndCols.clear(); - resultRowAndCols.clear(); - } } }); } /** - * @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame + * Flushes {@link #resultRowAndCols} to the frame starting from {@link #rowId}, upto the frame writer's capacity. * @throws IOException */ - private void flushAllRowsAndCols(ArrayList resultRowAndCols) throws IOException + private void flushAllRowsAndCols() throws IOException { RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols); - AtomicInteger rowId = new AtomicInteger(0); - createFrameWriterIfNeeded(rac, rowId); - writeRacToFrame(rac, rowId); + createFrameWriterIfNeeded(rac); + writeRacToFrame(rac); } /** * @param rac The frame writer to write this {@link RowsAndColumns} object - * @param rowId RowId to get the column selector factory from the {@link RowsAndColumns} object */ - private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId) + private void createFrameWriterIfNeeded(RowsAndColumns rac) { if (frameWriter == null) { final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac); @@ -352,32 +364,38 @@ private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId) final ColumnSelectorFactory frameWriterColumnSelectorFactoryWithVirtualColumns = frameWriterVirtualColumns.wrap(frameWriterColumnSelectorFactory); frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactoryWithVirtualColumns); - currentAllocatorCapacity = frameWriterFactory.allocatorCapacity(); } } /** * @param rac {@link RowsAndColumns} to be written to frame - * @param rowId Counter to keep track of how many rows are added * @throws IOException */ - public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException + public void writeRacToFrame(RowsAndColumns rac) throws IOException { final int numRows = rac.numRows(); - rowId.set(0); while (rowId.get() < numRows) { - final boolean didAddToFrame = frameWriter.addSelection(); - if (didAddToFrame) { + if (frameWriter.addSelection()) { + incrementBoostColumn(); rowId.incrementAndGet(); - partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1); - } else if (frameWriter.getNumRows() == 0) { - throw new FrameRowTooLargeException(currentAllocatorCapacity); - } else { + } else if (frameWriter.getNumRows() > 0) { flushFrameWriter(); - return; + createFrameWriterIfNeeded(rac); + + if (frameWriter.addSelection()) { + incrementBoostColumn(); + rowId.incrementAndGet(); + return; + } else { + throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity()); + } + } else { + throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity()); } } + flushFrameWriter(); + clearRACBuffers(); } @Override @@ -510,4 +528,28 @@ private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow) )); } } + + /** + * Increments the value of the partition boosting column. It should be called once the row value has been written + * to the frame + */ + private void incrementBoostColumn() + { + partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1); + } + + /** + * @return true if frame has rows pending flush to the output channel, false otherwise. + */ + private boolean frameHasRowsPendingFlush() + { + return frameWriter != null && frameWriter.getNumRows() > 0; + } + + private void clearRACBuffers() + { + frameRowsAndCols.clear(); + resultRowAndCols.clear(); + rowId.set(0); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java index 9d64fffe23e2..e5d191bbb5c3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java @@ -66,6 +66,99 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBase { + private static final List> INPUT_ROWS = ImmutableList.of( + ImmutableMap.of("added", 1L, "cityName", "city1"), + ImmutableMap.of("added", 1L, "cityName", "city2"), + ImmutableMap.of("added", 2L, "cityName", "city3"), + ImmutableMap.of("added", 2L, "cityName", "city4"), + ImmutableMap.of("added", 2L, "cityName", "city5"), + ImmutableMap.of("added", 3L, "cityName", "city6"), + ImmutableMap.of("added", 3L, "cityName", "city7") + ); + + @Test + public void testFrameWriterReachingCapacity() throws IOException + { + // This test validates that all output rows are flushed to the output channel even if frame writer's + // capacity is reached, by subsequent iterations of runIncrementally. + final ReadableInput factChannel = buildWindowTestInputChannel(); + + RowSignature inputSignature = RowSignature.builder() + .add("cityName", ColumnType.STRING) + .add("added", ColumnType.LONG) + .build(); + + FrameReader frameReader = FrameReader.create(inputSignature); + + RowSignature outputSignature = RowSignature.builder() + .addAll(inputSignature) + .add("w0", ColumnType.LONG) + .build(); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(new TableDataSource("test")) + .intervals(new LegacySegmentSpec(Intervals.ETERNITY)) + .columns("cityName", "added") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(new HashMap<>()) + .build()), + new LegacySegmentSpec(Intervals.ETERNITY), + new HashMap<>(), + outputSignature, + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + ImmutableList.of() + ); + + final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory( + FrameWriters.makeRowBasedFrameWriterFactory( + new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + outputSignature, + Collections.emptyList(), + false + ), + INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be reached. + ); + + final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal(); + final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor( + query, + factChannel.getChannel(), + outputChannel.writable(), + frameWriterFactory, + frameReader, + new ObjectMapper(), + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + inputSignature, + 100, + ImmutableList.of("added") + ); + + exec.runFully(processor, null); + + final Sequence> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel( + outputChannel.readable(), + FrameReader.create(outputSignature) + ); + + List> outputRows = rowsFromProcessor.toList(); + Assert.assertEquals(INPUT_ROWS.size(), outputRows.size()); + + for (int i = 0; i < INPUT_ROWS.size(); i++) { + Map inputRow = INPUT_ROWS.get(i); + List outputRow = outputRows.get(i); + + Assert.assertEquals("cityName should match", inputRow.get("cityName"), outputRow.get(0)); + Assert.assertEquals("added should match", inputRow.get("added"), outputRow.get(1)); + Assert.assertEquals("row_number() should be correct", (long) i + 1, outputRow.get(2)); + } + } + @Test public void testBatchingOfPartitionByKeys_singleBatch() throws Exception { @@ -195,18 +288,7 @@ private ReadableInput buildWindowTestInputChannel() throws IOException .add("cityName", ColumnType.STRING) .add("added", ColumnType.LONG) .build(); - - List> rows = ImmutableList.of( - ImmutableMap.of("added", 1L, "cityName", "city1"), - ImmutableMap.of("added", 1L, "cityName", "city2"), - ImmutableMap.of("added", 2L, "cityName", "city3"), - ImmutableMap.of("added", 2L, "cityName", "city4"), - ImmutableMap.of("added", 2L, "cityName", "city5"), - ImmutableMap.of("added", 3L, "cityName", "city6"), - ImmutableMap.of("added", 3L, "cityName", "city7") - ); - - return makeChannelFromRows(rows, inputSignature, Collections.emptyList()); + return makeChannelFromRows(INPUT_ROWS, inputSignature, Collections.emptyList()); } private ReadableInput makeChannelFromRows( diff --git a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq2.iq b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq2.iq new file mode 100644 index 000000000000..730003556159 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq2.iq @@ -0,0 +1,54 @@ +!set plannerStrategy DECOUPLED +!use druidtest://?componentSupplier=DrillWindowQueryMSQComponentSupplier +!set outputformat mysql + +# This test validates that all output rows are flushed to the output channel even if frame writer's capacity is reached. + +select count(*) as actualNumRows +from ( + select countryName, cityName, channel, added, delta, row_number() over() as rowNumber + from wikipedia + group by countryName, cityName, channel, added, delta +); ++---------------+ +| actualNumRows | ++---------------+ +| 11631 | ++---------------+ +(1 row) + +!ok + +# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for empty over() clause scenario. + +select count(*) as numRows, max(rowNumber) as maxRowNumber +from ( + select countryName, cityName, channel, added, delta, row_number() over() as rowNumber + from wikipedia + group by countryName, cityName, channel, added, delta +); ++---------+--------------+ +| numRows | maxRowNumber | ++---------+--------------+ +| 11631 | 11631 | ++---------+--------------+ +(1 row) + +!ok + +# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for non-empty over() clause scenario. + +select rowNumber, count(rowNumber) as numRows +from ( + select countryName, cityName, channel, added, delta, row_number() over(partition by countryName, cityName, channel, added, delta) as rowNumber + from wikipedia + group by countryName, cityName, channel, added, delta +) group by rowNumber; ++-----------+---------+ +| rowNumber | numRows | ++-----------+---------+ +| 1 | 11631 | ++-----------+---------+ +(1 row) + +!ok From 9a520be3485d6c6ffd66fcff003e9b677b663fc7 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 1 Oct 2024 18:43:39 +0530 Subject: [PATCH 2/2] WindowOperatorQueryFrameProcessor: Avoid unnecessary re-runs of runIncrementally() --- .../WindowOperatorQueryFrameProcessor.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 7289fb6bb113..2ca2466ada8d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -276,17 +276,19 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { // Add current row to the same batch of rows for processing. rowsToProcess.add(currentRow); - if (rowsToProcess.size() > maxRowsMaterialized) { - // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. - processRowsUpToLastPartition(); - } - ensureMaxRowsInAWindowConstraint(rowsToProcess.size()); } else { lastPartitionIndex = rowsToProcess.size() - 1; outputRow = currentRow.copy(); - return ReturnOrAwait.runAgain(); + rowsToProcess.add(currentRow); } frameCursor.advance(); + + if (rowsToProcess.size() > maxRowsMaterialized) { + // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. + processRowsUpToLastPartition(); + ensureMaxRowsInAWindowConstraint(rowsToProcess.size()); + return ReturnOrAwait.runAgain(); + } } return ReturnOrAwait.runAgain(); }