Skip to content

Add GlueingPartitioningOperator + Corresponding changes in window function layer to consume it for MSQ#17038

Merged
cryptoe merged 29 commits intoapache:masterfrom
Akshat-Jain:glueing-partitioning-operator
Oct 17, 2024
Merged

Add GlueingPartitioningOperator + Corresponding changes in window function layer to consume it for MSQ#17038
cryptoe merged 29 commits intoapache:masterfrom
Akshat-Jain:glueing-partitioning-operator

Conversation

@Akshat-Jain
Copy link
Copy Markdown
Contributor

@Akshat-Jain Akshat-Jain commented Sep 12, 2024

Description

Currently, WindowOperatorQueryFrameProcessor was using the following operator pipeline for window processing: (MSQ shuffling) -> NaiveSortOperator -> NaivePartitioningOperator -> WindowOperator.

Because of this, we had the following issues:

  1. If we wanted to process a subset of rows (in the cases where we could), we had to have the comparison logic (on the PARTITION BY columns) in the WindowOperatorQueryFrameProcessor layer. This is not good because:
    1. The partitioning logic would be done again in the NaivePartitioningOperator, hence it's redundant.
    2. It's a lot of code, making it unnecessarily difficult to follow the logic.
  2. NaiveSortOperator was a synchronization barrier. It needs to process all rows before sending data to the receiver.

This PR introduces 2 new operators:

  1. GlueingPartitioningOperator: It continuously receives data, and outputs batches of partitioned RACs. It maintains a last-partitioning-boundary of the last-pushed-RAC, and attempts to glue it with the next RAC it receives, ensuring that partitions are handled correctly, even across multiple RACs. You can check GlueingPartitioningOperatorTest for some good examples of the "glueing" work.
  2. PartitionSortOperator: It sorts rows inside partitioned RACs, on the sort columns. The input RACs it receives are expected to be "complete / separate" partitions of data.

With this PR's changes, we are converting the operator pipeline from

(MSQ shuffling) -> NaiveSortOperator -> NaivePartitioningOperator -> WindowOperator

into

(MSQ shuffling) -> GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator

This allows WindowOperatorQueryFrameProcessor to send RACs of any number of rows into the operator pipeline, without having to do the partitioning on the PARTITION BY columns.

Other notable changes done in the PR

  1. WindowOperatorQueryKit
    1. Add translation from NaiveSortOperator -> NaivePartitioningOperator -> WindowOperator into GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator.
    2. Changed logic of creation of window stages. Previously we were creating a single window stage if we had an empty over() clause. But we can't do that now since we rely on MSQ shuffling to cluster the data on the partitioning keys for us, as we don't have NaiveSortOperator in the chain now.

  2. WindowOperatorQueryFrameProcessor
    1. Modified the operator so that we don't create the operator chain again and again for new RACs.
    2. Modified the operator execution to not run it to completion, and control it manually, since we don't want the completed() to be called.
    3. Logic changes to blindly send rows to the operator pipeline whenever number of rows crosses a threshold.

  3. Refactoring work: Created base abstract classes for the following pairs of classes:
    1. NaiveSortOperator / PartitionSortOperator
    2. NaiveSortOperatorFactory / PartitionSortOperatorFactory
    3. NaivePartitioningOperator / GlueingPartitioningOperator
    4. NaivePartitioningOperatorFactory / GlueingPartitioningOperatorFactory

Key changed/added classes in this PR

  • GlueingPartitioningOperator and corresponding factory class
  • PartitionSortOperator and corresponding factory class
  • WindowOperatorQueryKit
  • WindowOperatorQueryFrameProcessor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions Bot added Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 12, 2024
Copy link
Copy Markdown
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the PR @Akshat-Jain!

Comment thread sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java Outdated
Comment thread sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java Outdated

private boolean needToProcessBatch()
{
return numRowsInFrameRowsAndCols >= maxRowsMaterialized / 2; // Can this be improved further?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why divide by 2 ? that doesn't give any guarantee that it will be inside bounds
people could set it to half if needed - but I think its easier to document clear things...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some threshold to start pushing RACs into the operator pipeline.

We discussed that we should push N rowed RACs into the pipeline. But it's not trivial to create RACs of exact size N.

So I felt it would be better to just have a threshold like push RACs into the pipeline when they cross N rows. I chose N = maxRowsMaterialized / 2 but we can always discuss on better values for this.

that doesn't give any guarantee that it will be inside bounds

I think it does 🤔

convertRowFrameToRowsAndColumns() method enforces the maxRowsMaterialized constraint: ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows()), hence it won't allow us to accumulate more than maxRowsMaterialized rows.

Thoughts? Let me know if I'm missing something. Thanks!

stageRowSignature = finalWindowStageRowSignature;
nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please open a separate PR and fix the stagebuilder rather than hacking it backwards from here

@Akshat-Jain Akshat-Jain marked this pull request as ready for review September 25, 2024 08:17

protected abstract Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns rac);

protected abstract void handleKeepItGoing(AtomicReference<Signal> signalRef, Iterator<RowsAndColumns> iterator, Receiver receiver);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see how these methods make the implementing class's simpler...they don't really hide away much complexity...


Iterator<RowsAndColumns> partitionsIter = getIteratorForRAC(rac);

AtomicReference<Signal> keepItGoing = new AtomicReference<>(Signal.GO);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use an AtomicReference ; the handleKeepItGoing method is void why not use the return value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It forces us to return null in GlueingPartitioningOperator (as we need to return something):

  @Override
  protected Signal handleKeepItGoing(Iterator<RowsAndColumns> iterator, Receiver receiver)
  {
    RowsAndColumns rowsAndColumns = iterator.next();
    if (iterator.hasNext()) {
      return receiver.push(rowsAndColumns);
    } else {
      previousRac = rowsAndColumns;
      return null;
    }
  }

And then we have to handle the null specifically, as we don't want to update the signal in the case of null, since handlePush() needs to return the last non-null signal.

    Signal keepItGoing = Signal.GO;
    while (keepItGoing == Signal.GO && partitionsIter.hasNext()) {
      Signal signal = handleKeepItGoing(partitionsIter, receiver);
      if (signal != null) {
        keepItGoing = signal;
      }
    }

This didn't seem clean to me, so I ended up going with the AtomicRef approach.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it maybe return an Optional<Signal> ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't relevant anymore as I had to move this logic into Glueing/NaivePartitioningOperator layer (away from Abstract layer).

I tried to keep handlePush() and handleKeepItGoing() in Abstract layer, but since we are using static classes now, we can't override the implementation of a static method, so I had to remove these methods and move the logic to the individual classes.

@@ -110,28 +79,7 @@ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
@Override
public Signal push(RowsAndColumns rac)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the Receiver a static inner class; it should have the Iterator as its field.

handlePush naturally wants to be a method of it

  • the full if (cont.iter != null) part should also go into the abstract;
  • put the full body of the while into a method in the abstract
  • the glueing should override that method and before calling the super() it could check if there are more elements and save that and use that when the Receiver is built

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A static receiver wouldn't have access to methods like ensureMaxRowsMaterializedConstraint() which are also called from non-static methods.

Also, can you elaborate why should we make receiver static? Even if we had a StaticReceiver, we would still have to do new StaticReceiver() with the required state.

For example, to add a static receiver in NaivePartitioningOperator, I added the following StaticReceiver class and then replaced new Receiver() {} with new StaticReceiver(receiver, iterHolder):

static class StaticReceiver implements Receiver
  {
    private final Receiver delegate;
    private final AtomicReference<Iterator<RowsAndColumns>> iterHolder;

    public StaticReceiver(
        Receiver delegate,
        AtomicReference<Iterator<RowsAndColumns>> iterHolder
    ) {
      this.delegate = delegate;
      this.iterHolder = iterHolder;
    }

    @Override
    public Signal push(RowsAndColumns rac)
    {
      return handlePush(rac, delegate, iterHolder);
    }

    @Override
    public void completed()
    {
      if (iterHolder.get() == null) {
        delegate.completed();
      }
    }
  }

I think we're not on the same page wrt how the static receiver needs to look like. Can you share your thoughts on the above?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem like a dealbreaker...as a last resort ensureMaxRowsMaterializedConstraint can be static and pass the 2 integers for it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kgyrtkirk The iterator needs to be created in the receiver's push() method.

Having a static receiver doesn't let us create the iterator there, since getIteratorForRAC() is not static.

getIteratorForRAC() can't be made static since it's supposed to be an abstract method, with different implementations in NaivePartitioningOperator and GlueingPartitioningOperator.

If I remove getIteratorForRAC() from the abstract class, and just have it as a regular non-overridden static method, then how to pass the iterator to the static receiver? I can't pass it in like super.push(rac, iteratorForRac) since it violates the method signature of push().

Also, trying to make everything static also ends up requiring us to pass unnecessary variables as method params, and structure everything just to make the receiver static, which seems unnecessary and not clean to me. We anyway would still have to do new StaticReceiver(field1, field2,...) everytime instead of new Receiver().

Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made local changes to make receiver and the iterator class static. The code doesn't seem clean though, as I had to pass a bunch of params. Also trying to see what can be moved to Abstract class with those changes. (This is all on my local right now, just updating this info here)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kgyrtkirk Have pushed the change to the PR to make the relevant classes static. Appreciate if you could take a look again, thanks!

public class GlueingPartitioningOperator extends AbstractPartitioningOperator
{
private final int maxRowsMaterialized;
private RowsAndColumns previousRac;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better to not have this in Operator scope; I believe it should belong to the Receiver

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previousRac is also used in the continuation logic, so needs to be at the Operator scope.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although it works - that's bad design


Iterator<RowsAndColumns> partitionsIter = getIteratorForRAC(rac);

AtomicReference<Signal> keepItGoing = new AtomicReference<>(Signal.GO);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it maybe return an Optional<Signal> ?

throw new NoSuchElementException();
}

if (!firstPartitionHandled) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: you don't necessarily need this boolean; couldn't (previousRac != null) act like it?
or currentIndex==0 ?
having (previousRac != null) at a higher level could also cleanup some conditionals

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to using currentIndex == 0

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please read my comment again

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made some change in the way the conditionals are structured: b725332

Please let me know if this aligns with the suggestion.

)
{
super(partitionColumns, child);
this.maxRowsMaterialized = maxRowsMaterialized;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's int, so it can't be null.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: unboxing happens at this point as this.maxRowsMaterialized is an int; meanwhile maxRowsMaterialized is an Integer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have added Preconditions.checkNotNull(maxRowsMaterialized, "maxRowsMaterialized cannot be null");, hope that works.

@Akshat-Jain Akshat-Jain requested a review from kgyrtkirk October 10, 2024 08:26
Comment on lines +238 to +250
private RowsAndColumns makeSimpleRac(int... values)
{
return MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of("column", new IntArrayColumn(values))
);
}

private RowsAndColumnsHelper expectedSimpleRac(int... values)
{
return new RowsAndColumnsHelper()
.expectColumn("column", values)
.allColumnsRegistered();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is not critical; but if you copy these 10 lines into multiple files that creates a maintenance burden after some iterations - its better to try to reuse such things somehow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these methods to RowsAndColumnsHelper class. Hope that works.

)
{
super(partitionColumns, child);
this.maxRowsMaterialized = maxRowsMaterialized;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: unboxing happens at this point as this.maxRowsMaterialized is an int; meanwhile maxRowsMaterialized is an Integer

Comment on lines +173 to +176
ClusteredGroupPartitioner groupPartitioner = rac.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
}
Copy link
Copy Markdown
Member

@kgyrtkirk kgyrtkirk Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: use ClusteredGroupPartitioner.fromRAC

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, wasn't aware of this. Have made the change in both partitioning operators. Thanks for pointing this out!


private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;
private Operator op = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this field have a more verbose name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to operator. Any other naming suggestion? 😅

if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
catch (IOException e) {
throw new RuntimeException(e);
Copy link
Copy Markdown
Member

@kgyrtkirk kgyrtkirk Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: why the need to catch this?
at line 136 it was okay to throw - why not okay here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, have made the change.

return null; // Signal that the operator has completed its work
}

// Return a non-null continuation object to indicate that we want to continue processing.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment and the returned Closeable lives up to the contracts associated with the implemented method...but that didn't worked before this change either.....

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to re-use the same operator, I needed a way to keep it running..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants