Skip to content

projections part 1 - realtime segments#17117

Closed
clintropolis wants to merge 8 commits intoapache:masterfrom
clintropolis:projections-realtime
Closed

projections part 1 - realtime segments#17117
clintropolis wants to merge 8 commits intoapache:masterfrom
clintropolis:projections-realtime

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented Sep 19, 2024

Description

This PR begins to introduce the concept of projections to Druid datasources, which are similar to materialized views but are built into a segment, and which can automatically be used during query execution if the projection fits the query. This PR only contains the logic to build and query them for realtime queries, and does not contain the ability to serialize and actually store them in persisted segments, so it is effectively a toy right now.

changes:

  • Adds ProjectionSpec interface, AggregateProjectionSpec implementation for defining rollup projections on druid datasources
  • Adds projections to DataSchema
  • Adds projection building and querying support to OnHeapIncrementalIndex

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

This PR begins to introduce the concept of projections to Druid datasources, which are similar to materialized views but are built into a segment, and which can automatically be used during query execution if the projection fits the query. This PR only contains the logic to build and query them for realtime queries, and does not contain the ability to serialize and actually store them in persisted segments, so it is effectively a toy right now.

changes:
* Adds ProjectionSpec interface, AggregateProjectionSpec implementation for defining rollup projections on druid datasources
* Adds projections to DataSchema
* Adds projection building and querying support to OnHeapIncrementalIndex
final Expr expr = expressionVirtualColumn.getParsedExpression().get();
if (expr instanceof TimestampFloorExprMacro.TimestampFloorExpr) {
final TimestampFloorExprMacro.TimestampFloorExpr gran = (TimestampFloorExprMacro.TimestampFloorExpr) expr;
if (gran.getArg().getBindingIfIdentifier() != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if statement is pretty obtuse, no clue what it's checking for. It reads like "If this one thing is not null, return this other thing". Perhaps a comment for why? Or the TimestampFloorExpr concrete class could gain a method that knows how to return a granularity?

Comment on lines +30 to +34
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "aggregate", value = AggregateProjectionSpec.class)
})
public interface ProjectionSpec
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stream-of-consciousness comment, but this interface has only one concrete implementation, right? Is there a reason it needs to be an interface? You could make it just a concrete class (and still force a type parameter to exist if you want) for now, until we have a second implementation that we want? Or, do you already have another implementation of this interface in mind that you are thinking we will use?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other interface I had in mind is projections which are not using a rollup facts holder, which came from some offline discussions I have had with @gianm, but I have not built this yet. The thinking was that since we've lifted the restriction that segments have to be time ordered, there seems likely to be some use cases for projections which filter, transform, or order the data differently than what the 'base' table does but without grouping the data. I suspect we would always want these to be filtered, or else represent them in persisted segments quite a bit differently (maybe just an index for ordering with no filtering).

The ordering stuff is also not fully realized yet so maybe the utility of these kinds of projections isn't quite apparent at this time. We were thinking that query engines should be able to take advantage of the cases where CursorHolder.getOrdering() is the same as the order specified by the CursorBuildSpec.getPreferredOrdering(), but right now its use is primarily limited to making cursors for ascending or descending time ordered data for the timeseries, timeboundary, and scan with time ordering queries (which are the only things which currently set preferredOrdering on the build spec today).

I'll back out the interface for now until we add other implementations.


public boolean isAggregate()
{
return !CollectionUtils.isNullOrEmpty(groupingColumns) || !CollectionUtils.isNullOrEmpty(aggregators);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this boolean hard to keep track of. After thinking about it a lot, understand it. It's essentially the same as "are there aggregators? if so, return true, if not, check if there are grouping columns, if there are grouping columns and no aggregators, then assume that we are just naively dedupping with a grouping and moving on"

As I thought about if there was an easier to grok way of dealing with this, I think it might be easier to actually add a boolean isAggregate field that gets set to true when either groupingColumns or aggregators are set. This then just returns that. I think that'd be easier to understand as it's basically saying "if you set the groupingColumns or give me aggregators, then I'm an aggregate" which is essentially what we are trying to say. It also effectively memoizes the value, though I don't know that we will really build this over and over and over again...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it seems reasonable to just precompute it in the CursorBuildSpec constructor (and add javadocs too)

}
}

public static boolean compatibleOrdering(CursorBuildSpec buildSpec, List<OrderBy> ordering)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that this isn't a method on CursorBuildSpec? That would seem pretty natural, I think?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

Comment on lines +70 to +78
if (buildSpec.getPreferredOrdering().isEmpty()) {
return true;
}
// all columns must be present in ordering if the build spec specifies them
if (ordering.size() < buildSpec.getPreferredOrdering().size()) {
return false;
}
for (int i = 0; i < buildSpec.getPreferredOrdering().size(); i++) {
final OrderBy preferredOrderBy = buildSpec.getPreferredOrdering().get(i);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if buildSpec.getPreferredOrdering() were to ever do lazy computation of things, this is going to call that a lot of times and basically keep doing the same thing over and over again. Get it once and reuse it pls.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't imagine that it will ever be lazy since its just populated by a builder from the query engines, but this problem went away moving the function to CursorBuildSpec

Comment on lines +237 to +243
final DimensionDesc rewrite = new DimensionDesc(
i++,
parent.getName(),
parent.getHandler(),
parent.getIndexer()
);
descs.add(rewrite);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it called "rewrite"? Is it getting renamed or something? As I read the code, I feel like there's some game going on with naming of things, but I'm unsure of what it is. Is there something to be aware of with naming?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bad name, it should have been called child. It is a DimensionDesc that doesn't make a new handler or indexer, but instead just contains the contents of its parent DimensionDesc except with the position of the child (DimensionDesc.getIndex() which should probably be renamed DimensionDesc.getPosition()...)

Comment on lines +360 to +363
for (OnHeapAggregateProjection projection : aggregateProjections) {
projection.addToFacts(key, inputRowHolder.getRow(), parseExceptionMessages, totalSizeInBytes);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought through if this generates a meaningful semantic issue or not, but I had been imagining that the projections are updated after the primary rather than before. Was this placed first for a specific reason?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't really make a difference either way and works fine before or after. My initial prototype was just like.. inlined through OnHeapIncrementalIndex and i had the projection logic first because i wanted my debugger to get to the code asap so had it at the front of the method.

Comment on lines +545 to +547
if (buildSpec.getQueryContext().getBoolean(QueryContexts.CTX_NO_PROJECTION, false)) {
return null;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a weird place to check this. I think it's okay to check here, but I'd actually say that instead of returning null this should throw a defensive exception and the developer should change the code to stop calling this method if it's not supposed to use a projection. Is there a reason that it is hard for the calling code to know whether it's allowed to use projections or not?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it is likely the only ever direct caller of this method is going to be IncrementalIndexCursorFactory.makeCursorHolder, which seems also a reasonable place to check, I can move it there if you prefer. The main reason I did it here is because i wired up both context flags I added at the same time when I wrote it, since here is also where checking the context flag for forcing a specific projection be used by name is happening.

Comment on lines +1264 to +1269
for (VirtualColumn vc : schema.getVirtualColumns().getVirtualColumns()) {
if (vc.requiresColumn(ColumnHolder.TIME_COLUMN_NAME) && vc.requiredColumns().size() == 1) {
virtualGranularity = Granularities.fromVirtualColumn(vc);
break;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple time floors in the same projection, the rule is that the first one is the virtual granularity? That sounds correct as this is also the sort-order for the rows right? so the first one will take precedence. It might be worth it to explain that in a comment or move this to a static method that can have method-level javadoc or something like that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was actually incorrect, we need to use the finest granularity time_floor so i have updated the logic to do that and added comments

}
}

public static class OnHeapAggregateProjection implements IncrementalIndexRowSelector
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A curiosity question: the name of this seems to be specific to a Projection. But, like, you could argue that even the "primary" could also be a projection of the incoming data, which could show up in the code as having one more projection for the primary along with the other defined projections. Instead, you've taken the approach of having a different Projection class than the main IncrementalIndex. Is there a specific reason why not modeling the "primary" as just-another-projection ended up being the favored approach?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, i was (and am still) considering this, especially since IncrementalIndex itself is kind of a strange abstraction in the first place and I think it could let us clean some stuff up. The main reason I have not tackled this yet is because it would require being able to represent no rollup projections, which is left out for now, the other was that it would be a bit more dramatic of a change for IncrementalIndex, and I was trying to be conservative on overall changes for this PR. I do think it would be a lot nicer though if it was just all projections, so I plan to keep thinking about this.

return isAggregate;
}

public boolean isCompatibleOrdering(List<OrderBy> ordering)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some description about what is compatible ordering?

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return rowSelector.getColumnCapabilities(column);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if I asked this before but doesn't it need to look up in the rewrite columns first?

Comment on lines +1340 to +1355
final ImmutableList.Builder<OrderBy> listBuilder =
ImmutableList.builderWithExpectedSize(dimensions.size() + 1);
int i = 0;
if (i == foundTimePosition) {
listBuilder.add(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME));
}
for (String dimName : dimensionsMap.keySet()) {
listBuilder.add(OrderBy.ascending(dimName));
i++;
if (i == foundTimePosition) {
listBuilder.add(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME));
}
}
if (foundTimePosition < 0) {
listBuilder.add(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the handling of time column is a bit difficult to follow. can you explain what is going on here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I understand it now. why do we need to add time col at the end?

// always have its time-like column in the grouping columns list, so its position in this array specifies -1
final int[] parentDimIndex = new int[schema.getGroupingColumns().size()];
Arrays.fill(parentDimIndex, -1);
int foundTimePosition = -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
int foundTimePosition = -1;
int timePosition = -1;

Comment on lines +1572 to +1579
// special handle time granularity
final Granularity virtualGranularity = Granularities.fromVirtualColumn(buildSpecVirtualColumn);
if (virtualGranularity != null) {
return !virtualGranularity.isFinerThan(projectionGranularity);
} else {
// anything else with __time requires none granularity
return Granularities.NONE.equals(projectionGranularity);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it work if requiredInputs are more than 1 and one of the required input is a time column?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants