diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index a0572a91b4df..4a8960ef70d7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -256,17 +256,19 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { // Add current row to the same batch of rows for processing. rowsToProcess.add(currentRow); - if (rowsToProcess.size() > maxRowsMaterialized) { - // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. - processRowsUpToLastPartition(); - } - ensureMaxRowsInAWindowConstraint(rowsToProcess.size()); } else { lastPartitionIndex = rowsToProcess.size() - 1; outputRow = currentRow.copy(); - return ReturnOrAwait.runAgain(); + rowsToProcess.add(currentRow); } frameCursor.advance(); + + if (rowsToProcess.size() > maxRowsMaterialized) { + // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. + processRowsUpToLastPartition(); + ensureMaxRowsInAWindowConstraint(rowsToProcess.size()); + return ReturnOrAwait.runAgain(); + } } return ReturnOrAwait.runAgain(); }