Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.AndFilter;
Comment thread
pranavbhole marked this conversation as resolved.
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.LikeFilter;
Expand Down Expand Up @@ -314,84 +315,29 @@ to generate filters to be passed to base cursor (filtersPushedDownToBaseCursor)
filtersPushedDownToBaseCursor -> null (as the filter cannot be re-written due to presence of virtual columns)
filtersForPostUnnestCursor -> d12 IN (a,b) or m1 < 10
*/
class FilterSplitter
{
final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}
}

final FilterSplitter filterSplitter = new FilterSplitter();
final FilterSplitter filterSplitter = new FilterSplitter(inputColumn, inputColumnCapabilites, queryVirtualColumns);

if (queryFilter != null) {
List<Filter> preFilterList = new ArrayList<>();
final int origFilterSize;
if (queryFilter.getRequiredColumns().contains(outputColumnName)) {
// outside filter contains unnested column
// requires check for OR
if (queryFilter instanceof OrFilter) {
origFilterSize = ((OrFilter) queryFilter).getFilters().size();
for (Filter filter : ((OrFilter) queryFilter).getFilters()) {
if (filter.getRequiredColumns().contains(outputColumnName)) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
filter,
inputColumn,
inputColumnCapabilites
);
if (newFilter != null) {
preFilterList.add(newFilter);
}
} else {
preFilterList.add(filter);
// requires check for OR and And filters, disqualify rewrite for non-unnest filters
if (queryFilter instanceof BooleanFilter) {
boolean isTopLevelAndFilter = queryFilter instanceof AndFilter;
List<Filter> preFilterList = recursiveRewriteOnUnnestFilters(
(BooleanFilter) queryFilter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
);
// If rewite on entire query filter is successful then add entire filter to preFilter else skip and only add to post filter.
if (filterSplitter.getPreFilterCount() == filterSplitter.getOriginalFilterCount()) {
if (queryFilter instanceof AndFilter) {
filterSplitter.addPreFilter(new AndFilter(preFilterList));
} else if (queryFilter instanceof OrFilter) {
filterSplitter.addPreFilter(new OrFilter(preFilterList));
}
}
if (preFilterList.size() == origFilterSize) {
// there has been successful rewrites
final OrFilter preOrFilter = new OrFilter(preFilterList);
filterSplitter.addPreFilter(preOrFilter);
}
// add the entire query filter to unnest filter to be used in Value matcher
filterSplitter.addPostFilterWithPreFilterIfRewritePossible(queryFilter, true);
} else {
Expand All @@ -412,7 +358,173 @@ void addPreFilter(@Nullable final Filter filter)
);
}

class FilterSplitter
{
private String inputColumn;
private ColumnCapabilities inputColumnCapabilites;
private VirtualColumns queryVirtualColumns;

private int originalFilterCount = 0;
private int preFilterCount = 0;

public FilterSplitter(
String inputColumn,
ColumnCapabilities inputColumnCapabilites,
VirtualColumns queryVirtualColumns
)
{
this.inputColumn = inputColumn;
this.inputColumnCapabilites = inputColumnCapabilites;
this.queryVirtualColumns = queryVirtualColumns;
}

final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}

public void addToOriginalFilterCount(int c)
{
originalFilterCount += c;
}

public void addToPreFilterCount(int c)
{
preFilterCount += c;
}

public int getOriginalFilterCount()
{
return originalFilterCount;
}

public int getPreFilterCount()
{
return preFilterCount;
}
}

/**
* handles the nested rewrite for unnest columns in recursive way,
* it loops through all and/or filters and rewrite only required filters in the child and add it to preFilter if qualified
* or else skip adding it to preFilters.
* RULES:
* 1. Add to preFilters only when top level filter is AND.
* for example: a=1 and (b=2 or c=2) , In this case a=1 can be added as preFilters but we can not add b=2 as preFilters.
* 2. If Top level is OR filter then we can either choose to add entire top level OR filter to preFilter or skip it all together.
* for example: a=1 or (b=2 and c=2)
* 3. Filters on unnest column which is derived from Array or any other Expression can not be pushe down to base.
* for example: a=1 and vc=3 , lets say vc is ExpressionVirtualColumn, and vc=3 can not be push down to base even if top level is AND filter.
* A. Unnesting a single dimension e.g. select * from foo, UNNEST(MV_TO_ARRAY(dim3)) as u(d3)
* B. Unnesting an expression from multiple columns e.g. select * from foo, UNNEST(ARRAY[dim1,dim2]) as u(d12)
* In case A, d3 is a direct reference to dim3 so any filter using d3 can be rewritten using dim3 and added to pre filter
* while in case B, due to presence of the expression virtual column expressionVirtualColumn("j0.unnest", "array(\"dim1\",\"dim2\")", ColumnType.STRING_ARRAY)
* the filters on d12 cannot be pushed to the pre filters
*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, for future developers can we make it a bit clear with why in case 1, the entire filter (b=2 OR c=2) cannot be pushed down to the preFilter. Probably we can add an example saying that either b2 or c2 references a column that is not a part of the input for the case of unnest.

Something like, in unnest we come across 2 cases:

  1. unnesting a single dimension e.g. select * from foo, UNNEST(MV_TO_ARRAY(dim3)) as u(d3)
  2. Unnesting an expression from multiple columns e.g. select * from foo, UNNEST(ARRAY[dim1,dim2]) as u(d12)

In case 1, d3 is a direct reference to dim3 so any filter using d3 can be rewritten using dim3 and added to pre filter while in case2, due to presence of the expression virtual column expressionVirtualColumn("j0.unnest", "array(\"dim1\",\"dim2\")", ColumnType.STRING_ARRAY) the filters on d12 cannot be pushed to the pre filters

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comments

* @param queryFilter query filter passed to makeCursors
* @param inputColumn input column to unnest if it's a direct access; otherwise null
* @param inputColumnCapabilites input column capabilities if known; otherwise null
*/
private List<Filter> recursiveRewriteOnUnnestFilters(
BooleanFilter queryFilter,
final String inputColumn,
final ColumnCapabilities inputColumnCapabilites,
final FilterSplitter filterSplitter,
final boolean isTopLevelAndFilter
)
{
final List<Filter> preFilterList = new ArrayList<>();
for (Filter filter : queryFilter.getFilters()) {
if (filter.getRequiredColumns().contains(outputColumnName)) {
if (filter instanceof AndFilter) {
preFilterList.add(new AndFilter(recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
)));
} else if (filter instanceof OrFilter) {
// in case of Or Fiters, we set isTopLevelAndFilter to false that prevents pushing down any child filters to base
List<Filter> orChildFilters = recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
false
);
preFilterList.add(new OrFilter(orChildFilters));
} else {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
filter,
inputColumn,
inputColumnCapabilites
);
if (newFilter != null) {
// this is making sure that we are not pushing the unnest columns filters to base filter without rewriting.
preFilterList.add(newFilter);
filterSplitter.addToPreFilterCount(1);
}
/*
Push down the filters to base only if top level is And Filter
we can not push down if top level filter is OR or unnestColumn is derived expression like arrays
*/
if (isTopLevelAndFilter && getUnnestInputIfDirectAccess(unnestColumn) != null) {
filterSplitter.addPreFilter(newFilter != null ? newFilter : filter);
}
filterSplitter.addToOriginalFilterCount(1);
Copy link
Copy Markdown
Contributor

@somu-imply somu-imply Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the filter getting added to the preFilter list twice or counted twice with addToPreFilterCount ? It already got added to preFilterList once in line 503. This would cause the count to go up and when you make the comparison of count on line 330, it would give the wrong count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
} else {
preFilterList.add(filter);
// for filters on non unnest columns, we still need to count the nested filters if any as we are not traversing it in this method
int filterCount = Filters.countNumberOfFilters(filter);
filterSplitter.addToOriginalFilterCount(filterCount);
filterSplitter.addToPreFilterCount(filterCount);
}
}
return preFilterList;
}
/**
* Returns the input of {@link #unnestColumn}, if it's a direct access; otherwise returns null.
*/
Expand Down Expand Up @@ -465,7 +577,6 @@ static boolean filterMapsOverMultiValueStrings(final Filter filter)
return false;
}
}

return true;
} else if (filter instanceof NotFilter) {
return filterMapsOverMultiValueStrings(((NotFilter) filter).getBaseFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.BooleanFilter;
import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.DruidPredicateFactory;
Expand Down Expand Up @@ -422,4 +423,19 @@ private static LinkedHashSet<Filter> flattenOrChildren(final Collection<Filter>

return retVal;
}

public static int countNumberOfFilters(@Nullable Filter filter)
{
if (filter == null) {
return 0;
}
if (filter instanceof BooleanFilter) {
return ((BooleanFilter) filter).getFilters()
.stream()
.map(f -> countNumberOfFilters(f))
.mapToInt(Integer::intValue)
.sum();
}
return 1;
}
}
Loading