Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
Expand All @@ -36,6 +39,7 @@
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.Closeable;
Expand Down Expand Up @@ -157,17 +161,41 @@ private static class ScanResultValueFramesIterator implements Iterator<FrameSign
Cursor currentCursor = null;

/**
* Row signature of the current row
* Rows in the List form. The {@link #currentCursor} is a wrapper over these rows
*/
RowSignature currentRowSignature = null;
List<Object[]> currentRows = null;

/**
* Row index pointing to the current row in {@link #currentRows}. This is the exact same row that the {@link #currentCursor}
* is also pointing at. Therefore {@link #currentRows} + {@link #currentCursor} represent the same information as presented
* by {@link #currentCursor}.
*/
int currentRowIndex = -1;

/**
* Full row signature of the ScanResultValue, used to extract the rows out of it.
*/
RowSignature currentInputRowSignature = null;

/**
* Row signature of the ScanResultValue, with columns having unknown (null) types trimmed out. This is used to write
* the rows onto the frame. There's an implicit assumption (that we verify), that columns with null typed only
* contain null values, because the underlying segment didn't have the column.
*/
RowSignature currentOutputRowSignature = null;

/**
* Columns of the currentRows with missing type information. As we materialize the rows onto the frames, we also
* verify that these columns only contain null values.
*/
IntList nullTypedColumns = null;

public ScanResultValueFramesIterator(
Sequence<ScanResultValue> resultSequence,
MemoryAllocatorFactory memoryAllocatorFactory,
boolean useNestedForUnknownTypes,
RowSignature defaultRowSignature,
Function<RowSignature, Function<?, Object[]>> resultFormatMapper
final Sequence<ScanResultValue> resultSequence,
final MemoryAllocatorFactory memoryAllocatorFactory,
final boolean useNestedForUnknownTypes,
final RowSignature defaultRowSignature,
final Function<RowSignature, Function<?, Object[]>> resultFormatMapper
)
{
this.memoryAllocatorFactory = memoryAllocatorFactory;
Expand Down Expand Up @@ -200,26 +228,35 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature
// with which we have written the frames
final RowSignature writtenSignature = currentRowSignature;
FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(

final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
memoryAllocatorFactory,
currentRowSignature,
currentOutputRowSignature,
Collections.emptyList()
);
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
() -> currentCursor,
currentRowSignature
))) {
final Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(
new SettableCursorColumnSelectorFactory(() -> currentCursor, currentInputRowSignature))) {
while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row
if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full
break;
}

// Check that the columns with the null types are actually null before advancing
final Object[] currentRow = currentRows.get(currentRowIndex);
for (Integer columnNumber : nullTypedColumns) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I wonder why use a fastutil IntList - if it gets iterated with a foreach ; plain get?
this could be moved into some method like validateRow - that will naturally do a CSE of the currentRows.get(currentRowIndex) so that it will be only evaluated once

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to use FastUtil IntList as such. I just thought it might be faster to create than an arraylist.

this could be moved into some method like validateRow - that will naturally do a CSE of the currentRows.get(currentRowIndex) so that it will be only evaluated once

It is getting evaluated once here right? Unless I misinterpreted your comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just a note; this loop is validating one row; but to access that it has to do a function call currentRows.get(currentRowIndex) ; which became part of the loop body - moving it into a method could make it clear that it works on a row - and it will naturally remove the currentRows.get(currentRowIndex) as that's the row :)

if (currentRow[columnNumber] != null) {
throw DruidException.defensive(
"Expected a null value for column [%s]",
frameWriterFactory.signature().getColumnName(columnNumber)
);
}
}

firstRowWritten = true;
currentCursor.advance();
currentRowIndex++;
}

if (!firstRowWritten) {
Expand All @@ -228,7 +265,9 @@ public FrameSignaturePair next()
frame = Frame.wrap(frameWriter.toByteArray());
}

return new FrameSignaturePair(frame, writtenSignature);
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature
// with which we have written the frames
return new FrameSignaturePair(frame, frameWriterFactory.signature());
}

/**
Expand All @@ -244,7 +283,7 @@ private boolean done()

/**
* This is the most important method of this iterator. This determines if two consecutive scan result values can
* be batched or not, populates the value of the {@link #currentCursor} and {@link #currentRowSignature},
* be batched or not, populates the value of the {@link #currentCursor} and {@link #currentInputRowSignature},
* during the course of the iterator, and facilitates the {@link #next()}
* <p>
* Multiple calls to populateCursor, without advancing the {@link #currentCursor} is idempotent. This allows successive
Expand All @@ -257,7 +296,9 @@ private boolean done()
* if (hasNext()) was true before calling the method -
* 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points
* to the next row present in the sequence of the scan result values. This row would get materialized to frame
* 2. {@link #currentRowSignature} - Row signature of the row.
* 2. {@link #currentInputRowSignature} - Row signature of the row
* 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor
* 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched
* <p>
* Return value -
* if (hasNext()) is false before calling the method - returns false
Expand All @@ -275,25 +316,42 @@ private boolean populateCursor()

// At this point, we know that we need to move to the next non-empty cursor, AND it exists, because
// done() is not false
ScanResultValue scanResultValue = resultSequenceIterator.next();
final ScanResultValue scanResultValue = resultSequenceIterator.next();

final RowSignature rowSignature = scanResultValue.getRowSignature() != null
? scanResultValue.getRowSignature()
: defaultRowSignature;
RowSignature modifiedRowSignature = useNestedForUnknownTypes

final RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

// currentRowSignature at this time points to the previous row's signature
final boolean compatible = modifiedRowSignature != null
&& modifiedRowSignature.equals(currentRowSignature);
final IntList currentNullTypedColumns = new IntArrayList();
final RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder();

for (int i = 0; i < modifiedRowSignature.size(); ++i) {
ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null);
if (columnType == null) {
currentNullTypedColumns.add(i);
} else {
modifiedTrimmedRowSignatureBuilder.add(modifiedRowSignature.getColumnName(i), columnType);
}
}

final RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build();

// currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature
// because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can
// write both the rows onto the same frame
final boolean compatible = modifiedTrimmedRowSignature.equals(currentOutputRowSignature);

final List rows = (List) scanResultValue.getEvents();
final Iterable<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
final List<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
rows,
(Function) resultFormatMapper.apply(modifiedRowSignature)
));

Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
formattedRows,
modifiedRowSignature
);
Expand All @@ -306,7 +364,13 @@ private boolean populateCursor()
return populateCursor();
}

currentRowSignature = modifiedRowSignature;
currentInputRowSignature = modifiedRowSignature;
currentOutputRowSignature = modifiedTrimmedRowSignature;
nullTypedColumns = currentNullTypedColumns;
currentRows = formattedRows;
currentRowIndex = 0;


return compatible;
}
}
Expand Down
Loading