Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
70c05bb
Add GlueingPartitioningOperator + Corresponding changes in window fun…
Akshat-Jain Sep 5, 2024
dad20ee
Create GluedRACsIterator
Akshat-Jain Sep 13, 2024
52131ea
Modify MSQ window function execution to allow sending RACs into the o…
Akshat-Jain Sep 18, 2024
ae2ab9e
Revert unnecessary changes
Akshat-Jain Sep 19, 2024
5d0594b
Address review comments - part 1
Akshat-Jain Sep 25, 2024
56c5942
Address review comments - part 2 - rename base classes to abstract
Akshat-Jain Sep 25, 2024
b80fed6
Address review comments - part 3 - add RowsAndColumnsBuilder class
Akshat-Jain Sep 25, 2024
b0f3db8
Merge with master
Akshat-Jain Oct 3, 2024
82e2bb1
Fix MSQWindowTest
Akshat-Jain Oct 3, 2024
7aba378
Merge with master
Akshat-Jain Oct 3, 2024
929f108
Remove commented code
Akshat-Jain Oct 3, 2024
6f14fd0
Merge with master
Akshat-Jain Oct 3, 2024
348b722
Make allocator capacity in LazilyDecoratedRowsAndColumns configurable…
Akshat-Jain Oct 7, 2024
d5eab25
Fix MSQWindowTest
Akshat-Jain Oct 7, 2024
988f3f1
Address review comments
Akshat-Jain Oct 9, 2024
56afb9c
Make the capacity field mandatory in LazilyDecoratedRowsAndColumns
Akshat-Jain Oct 9, 2024
e6167e5
Fix CodeQL
Akshat-Jain Oct 9, 2024
d377788
Make classes static
Akshat-Jain Oct 10, 2024
82a60cb
Remove separate method for keepItGoing logic
Akshat-Jain Oct 10, 2024
db19325
Address review comments
Akshat-Jain Oct 10, 2024
a865fdf
Remove firstPartitionHandled boolean, use currentIndex instead
Akshat-Jain Oct 10, 2024
3075b96
Change isGlueingNeeded() to compare first and last rows of ConcatRAC
Akshat-Jain Oct 10, 2024
071cfed
Pass allocatorCapacity as Long, convert to integer in the ldrc layer
Akshat-Jain Oct 10, 2024
f947fc7
Fix CodeQL
Akshat-Jain Oct 10, 2024
b725332
Address review comment: Change order of conditionals to simplify the …
Akshat-Jain Oct 10, 2024
00ce202
Create AbstractReceiver
Akshat-Jain Oct 10, 2024
bb8f9fe
Move push() method logic to AbstractReceiver
Akshat-Jain Oct 10, 2024
aa2e46f
Improve some access modifiers
Akshat-Jain Oct 10, 2024
b73f472
Address review comments
Akshat-Jain Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
Expand Down Expand Up @@ -60,27 +59,17 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorList;
private final RowSignature stageRowSignature;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;

@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
@JsonProperty("stageRowSignature") RowSignature stageRowSignature
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;

if (partitionColumnNames == null) {
throw DruidException.defensive("List of partition column names encountered as null.");
}
this.partitionColumnNames = partitionColumnNames;
}

@JsonProperty("query")
Expand All @@ -95,24 +84,12 @@ public List<OperatorFactory> getOperators()
return operatorList;
}

@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}

@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
return stageRowSignature;
}

@JsonProperty("maxRowsMaterializedInWindow")
public int getMaxRowsMaterializedInWindow()
{
return maxRowsMaterializedInWindow;
}

@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
Expand Down Expand Up @@ -153,17 +130,15 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());

return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
readableInput.getChannelFrameReader(),
frameContext.jsonMapper(),
operatorList,
stageRowSignature,
maxRowsMaterializedInWindow,
partitionColumnNames
operatorList
);
}
);
Expand All @@ -190,16 +165,14 @@ public boolean equals(Object o)
return false;
}
WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o;
return maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
return Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}

@Override
public int hashCode()
{
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, stageRowSignature);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ public static String getMSQMode(final QueryContext queryContext)
);
}

public static int getMaxRowsMaterializedInWindow(final QueryContext queryContext)
{
return queryContext.getInt(
MAX_ROWS_MATERIALIZED_IN_WINDOW,
Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW
);
}

public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,65 +2219,107 @@ public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers()
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1379).frames(1),
CounterSnapshotMatcher.with().rows(13).bytes(989).frames(1),
2, 0, "output"
)

// Stage 3, Worker 0
// Stage 3, Worker 1 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1),
3, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(330).frames(1),
3, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1),
3, 0, "shuffle"
)

// Stage 3, Worker 1
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 3).bytes(0, 333).frames(0, 1),
CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 461).frames(0, 1),
3, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(345).frames(1),
CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 641).frames(0, 1),
3, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(333).frames(1),
CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(122, 132, 230, 235).frames(1, 1, 1, 1),
3, 1, "shuffle"
)

// Stage 3, Worker 2
// Stage 3, Worker 2 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 3).bytes(0, 0, 352).frames(0, 0, 1),
CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 114).frames(0, 0, 1),
3, 2, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(364).frames(1),
CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 144).frames(0, 0, 1),
3, 2, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(352).frames(1),
CounterSnapshotMatcher.with().rows(1).bytes(140).frames(1),
3, 2, "shuffle"
)

// Stage 3, Worker 3
// Stage 3, Worker 3 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 426).frames(0, 0, 0, 1),
CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 482).frames(0, 0, 0, 1),
3, 3, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(442).frames(1),
CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 662).frames(0, 0, 0, 1),
3, 3, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(426).frames(1),
CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(143, 137, 222, 238).frames(1, 1, 1, 1),
3, 3, "shuffle"
)

// Stage 4, Worker 0
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1),
4, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(349).frames(1),
4, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1),
4, 0, "shuffle"
)

// Stage 4, Worker 1
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 2).bytes(0, 235).frames(0, 1),
4, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(2).bytes(243).frames(1),
4, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(2).bytes(235).frames(1),
4, 1, "shuffle"
)

// Stage 4, Worker 2
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 4).bytes(0, 0, 418).frames(0, 0, 1),
4, 2, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(434).frames(1),
4, 2, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(418).frames(1),
4, 2, "shuffle"
)

// Stage 4, Worker 3
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 439).frames(0, 0, 0, 1),
4, 3, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(455).frames(1),
4, 3, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(439).frames(1),
4, 3, "shuffle"
)
.verifyResults();
}

Expand Down Expand Up @@ -2331,7 +2373,7 @@ public void testFailurePartitionByMVD_1()
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
"Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}
Expand All @@ -2350,7 +2392,7 @@ public void testFailurePartitionByMVD_2()
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
"Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class WindowOperatorQueryFrameProcessorFactoryTest
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
.withNonnullFields("query", "operatorList", "stageRowSignature", "maxRowsMaterializedInWindow", "partitionColumnNames")
.withNonnullFields("query", "operatorList", "stageRowSignature")
.usingGetClass()
.verify();
}
Expand Down
Loading