Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
private final int maxRowsMaterialized;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
Expand All @@ -97,6 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;

final AtomicInteger rowId = new AtomicInteger(0);

public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
Expand Down Expand Up @@ -152,7 +153,7 @@ public List<WritableFrameChannel> outputChannels()
}

@Override
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
{
/*
There are 2 scenarios:
Expand Down Expand Up @@ -213,32 +214,54 @@ Current approach with R&C and operators materialize a single R&C for processing.
Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data.
*/

// If there are rows pending flush, flush them and run again before processing any more rows.
if (frameHasRowsPendingFlush()) {
flushAllRowsAndCols();
return ReturnOrAwait.runAgain();
}

if (partitionColumnNames.isEmpty()) {
// Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}

if (inputChannel.isFinished()) {
// If no rows are flushed yet, process all rows.
if (rowId.get() == 0) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}

// If there are still rows pending after operations, run again.
if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
return ReturnOrAwait.awaitAll(inputChannels().size());
}

// Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
} else if (inputChannel.canRead()) {
}

if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
makeRowSupplierFromFrameCursor();
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
// If we have some rows pending processing, process them.
// We run it again as it's possible that frame writer's capacity got reached and some output rows are
// pending flush to the output channel.
if (!rowsToProcess.isEmpty()) {
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
Expand All @@ -253,17 +276,19 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
rowsToProcess.add(currentRow);
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
}
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
} else {
lastPartitionIndex = rowsToProcess.size() - 1;
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
rowsToProcess.add(currentRow);
}
frameCursor.advance();

if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
return ReturnOrAwait.runAgain();
}
}
return ReturnOrAwait.runAgain();
}
Expand Down Expand Up @@ -310,74 +335,69 @@ public Operator.Signal push(RowsAndColumns rac)
public void completed()
{
try {
// resultRowsAndCols has reference to frameRowsAndCols
// due to the chain of calls across the ops
// so we can clear after writing to output
flushAllRowsAndCols(resultRowAndCols);
frameRowsAndCols.clear();

flushAllRowsAndCols();
}
catch (IOException e) {
throw new RuntimeException(e);
}
finally {
frameRowsAndCols.clear();
resultRowAndCols.clear();
}
}
});
}

/**
* @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame
* Flushes {@link #resultRowAndCols} to the frame starting from {@link #rowId}, upto the frame writer's capacity.
* @throws IOException
*/
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException
private void flushAllRowsAndCols() throws IOException
{
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
AtomicInteger rowId = new AtomicInteger(0);
createFrameWriterIfNeeded(rac, rowId);
writeRacToFrame(rac, rowId);
createFrameWriterIfNeeded(rac);
writeRacToFrame(rac);
}

/**
* @param rac The frame writer to write this {@link RowsAndColumns} object
* @param rowId RowId to get the column selector factory from the {@link RowsAndColumns} object
*/
private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
private void createFrameWriterIfNeeded(RowsAndColumns rac)
{
if (frameWriter == null) {
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory frameWriterColumnSelectorFactory = csfm.make(rowId);
final ColumnSelectorFactory frameWriterColumnSelectorFactoryWithVirtualColumns =
frameWriterVirtualColumns.wrap(frameWriterColumnSelectorFactory);
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactoryWithVirtualColumns);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}

/**
* @param rac {@link RowsAndColumns} to be written to frame
* @param rowId Counter to keep track of how many rows are added
* @throws IOException
*/
public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException
public void writeRacToFrame(RowsAndColumns rac) throws IOException
{
final int numRows = rac.numRows();
rowId.set(0);
while (rowId.get() < numRows) {
final boolean didAddToFrame = frameWriter.addSelection();
if (didAddToFrame) {
if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
} else if (frameWriter.getNumRows() == 0) {
throw new FrameRowTooLargeException(currentAllocatorCapacity);
} else {
} else if (frameWriter.getNumRows() > 0) {
flushFrameWriter();
return;
createFrameWriterIfNeeded(rac);

if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
return;
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
}

flushFrameWriter();
clearRACBuffers();
}

@Override
Expand Down Expand Up @@ -510,4 +530,28 @@ private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
));
}
}

/**
* Increments the value of the partition boosting column. It should be called once the row value has been written
* to the frame
*/
private void incrementBoostColumn()
{
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
}

/**
* @return true if frame has rows pending flush to the output channel, false otherwise.
*/
private boolean frameHasRowsPendingFlush()
{
return frameWriter != null && frameWriter.getNumRows() > 0;
}

private void clearRACBuffers()
{
frameRowsAndCols.clear();
resultRowAndCols.clear();
rowId.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,99 @@

public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBase
{
private static final List<Map<String, Object>> INPUT_ROWS = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);

@Test
public void testFrameWriterReachingCapacity() throws IOException
{
// This test validates that all output rows are flushed to the output channel even if frame writer's
// capacity is reached, by subsequent iterations of runIncrementally.
final ReadableInput factChannel = buildWindowTestInputChannel();

RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();

FrameReader frameReader = FrameReader.create(inputSignature);

RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();

final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("cityName", "added")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);

final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
outputSignature,
Collections.emptyList(),
false
),
INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be reached.
);

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
frameReader,
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
100,
ImmutableList.of("added")
);

exec.runFully(processor, null);

final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(outputSignature)
);

List<List<Object>> outputRows = rowsFromProcessor.toList();
Assert.assertEquals(INPUT_ROWS.size(), outputRows.size());

for (int i = 0; i < INPUT_ROWS.size(); i++) {
Map<String, Object> inputRow = INPUT_ROWS.get(i);
List<Object> outputRow = outputRows.get(i);

Assert.assertEquals("cityName should match", inputRow.get("cityName"), outputRow.get(0));
Assert.assertEquals("added should match", inputRow.get("added"), outputRow.get(1));
Assert.assertEquals("row_number() should be correct", (long) i + 1, outputRow.get(2));
}
}

@Test
public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
{
Expand Down Expand Up @@ -195,18 +288,7 @@ private ReadableInput buildWindowTestInputChannel() throws IOException
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();

List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);

return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
return makeChannelFromRows(INPUT_ROWS, inputSignature, Collections.emptyList());
}

private ReadableInput makeChannelFromRows(
Expand Down
Loading