Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
}

/**
* Converts a join clause into an "in" filter if possible.
* Converts a join clause into appropriate filter(s) if possible.
* <p>
* The requirements are:
* <p>
Expand All @@ -144,49 +145,44 @@ static JoinClauseToFilterConversion convertJoinToFilter(
final Set<String> rightPrefixes
)
{
// This optimization kicks in when there is exactly 1 equijoin
final List<Equality> equiConditions = clause.getCondition().getEquiConditions();
if (clause.getJoinType() == JoinType.INNER
&& clause.getCondition().getNonEquiConditions().isEmpty()
&& clause.getCondition().getEquiConditions().size() > 0) {
final List<Filter> filters = new ArrayList<>();
int numValues = maxNumFilterValues;
&& equiConditions.size() == 1) {
// if the right side columns are required, the clause cannot be fully converted
boolean joinClauseFullyConverted = requiredColumns.stream().noneMatch(clause::includesColumn);
final Equality condition = CollectionUtils.getOnlyElement(
equiConditions,
xse -> new IAE("Expected only one equi condition")
);

for (final Equality condition : clause.getCondition().getEquiConditions()) {
final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier();
final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier();

if (leftColumn == null) {
return new JoinClauseToFilterConversion(null, false);
}

// don't add a filter on any right side table columns. only filter on left base table is supported as of now.
if (rightPrefixes.stream().anyMatch(leftColumn::startsWith)) {
joinClauseFullyConverted = false;
continue;
}
if (leftColumn == null) {
return new JoinClauseToFilterConversion(null, false);
}

Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag =
clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), numValues);
// For an empty values set, isAllUnique flag will be true only if the column had no non-null values.
if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) {
if (columnValuesWithUniqueFlag.isAllUnique()) {
return new JoinClauseToFilterConversion(FalseFilter.instance(), true);
} else {
joinClauseFullyConverted = false;
}
continue;
}
// don't add a filter on any right side table columns. only filter on left base table is supported as of now.
if (rightPrefixes.stream().anyMatch(leftColumn::startsWith)) {
return new JoinClauseToFilterConversion(null, false);
}

numValues -= columnValuesWithUniqueFlag.getColumnValues().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues())));
if (!columnValuesWithUniqueFlag.isAllUnique()) {
joinClauseFullyConverted = false;
Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag =
clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), maxNumFilterValues);
// For an empty values set, isAllUnique flag will be true only if the column had no non-null values.
if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) {
if (columnValuesWithUniqueFlag.isAllUnique()) {
return new JoinClauseToFilterConversion(FalseFilter.instance(), true);
}
return new JoinClauseToFilterConversion(null, false);
}

return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted);
final Filter onlyFilter = new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues());
if (!columnValuesWithUniqueFlag.isAllUnique()) {
joinClauseFullyConverted = false;
}
return new JoinClauseToFilterConversion(onlyFilter, joinClauseFullyConverted);
}

return new JoinClauseToFilterConversion(null, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, fina
}

try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) {
// Sorted set to encourage "in" filters that result from this method to do dictionary lookups in order.
// The hopes are that this will improve locality and therefore improve performance.
// Use a SortedSet so InDimFilter doesn't need to create its own
final Set<String> allValues = createValuesSet();
boolean allUnique = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
RowSignature.builder().add("country", ColumnType.STRING).build()
);

private static final InlineDataSource INDEXED_TABLE_DS_THREE_COLS = InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{"El Salvador", 1, 1.0},
new Object[]{"Mexico", 2, 2.0},
new Object[]{"United States", 3, 3.0},
new Object[]{"Norway", 4, 4.0},
new Object[]{"India", 5, 5.0},
new Object[]{"United States", 6, 3.0}
),
RowSignature.builder()
.add("country", ColumnType.STRING)
.add("m1", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.build()
);

private static final InlineDataSource NULL_INDEXED_TABLE_DS = InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{null}
Expand All @@ -99,6 +115,14 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
DateTimes.nowUtc().toString()
);

private static final IndexedTable TEST_INDEXED_TABLE_THREE_COLS = new RowBasedIndexedTable<>(
INDEXED_TABLE_DS_THREE_COLS.getRowsAsList(),
INDEXED_TABLE_DS_THREE_COLS.rowAdapter(),
INDEXED_TABLE_DS_THREE_COLS.getRowSignature(),
ImmutableSet.of("country", "m1", "m2"),
DateTimes.nowUtc().toString()
);

private static final IndexedTable TEST_NULL_INDEXED_TABLE = new RowBasedIndexedTable<>(
NULL_INDEXED_TABLE_DS.getRowsAsList(),
NULL_INDEXED_TABLE_DS.rowAdapter(),
Expand Down Expand Up @@ -625,4 +649,29 @@ public void test_convertJoinsToFilters_dontConvertJoinsDependedOnByPreviousJoins
conversion
);
}

@Test
public void test_convertJoinsToPartialFiltersMultipleCondtions()
{
JoinableClause joinableClause = new JoinableClause(
"j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE_THREE_COLS),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.country\" && y == \"j.m1\"", "j.", ExprMacroTable.nil())
);
final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
ImmutableList.of(joinableClause),
ImmutableSet.of("x", "y"),
Integer.MAX_VALUE
);

// Optimization does not kick in as there are > 1 equijoins
Assert.assertEquals(
Pair.of(
ImmutableList.of(),
ImmutableList.of(joinableClause)
),
conversion
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5410,4 +5410,98 @@ public void testJoinWithAliasAndOrderByNoGroupBy()
)
);
}

@Test
public void testJoinsWithTwoConditions()
{
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"SELECT t1.__time, t1.m1\n"
+ "FROM foo t1\n"
+ "JOIN (SELECT m1, MAX(__time) as latest_time FROM foo WHERE m1 IN (1,2) GROUP BY m1) t2\n"
+ "ON t1.m1 = t2.m1 AND t1.__time = t2.latest_time\n",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
GroupByQuery.builder()
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDataSource(new TableDataSource(CalciteTests.DATASOURCE1))
.setDimFilter(in("m1", ImmutableList.of("1", "2"), null))
.setDimensions(new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT))
.setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time")))
.setContext(context)
.build()
),
"j0.",
"((\"m1\" == \"j0.d0\") && (\"__time\" == \"j0.a0\"))",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "m1")
.context(context)
.build()
),
ImmutableList.of(
new Object[]{946684800000L, 1.0f},
new Object[]{946771200000L, 2.0f}
)
);
}

@Test
public void testJoinsWithThreeConditions()
{
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"SELECT t1.__time, t1.m1, t1.m2\n"
+ "FROM foo t1\n"
+ "JOIN (SELECT m1, m2, MAX(__time) as latest_time FROM foo WHERE m1 IN (1,2) AND m2 IN (1,2) GROUP by m1,m2) t2\n"
+ "ON t1.m1 = t2.m1 AND t1.m2 = t2.m2 AND t1.__time = t2.latest_time\n",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
GroupByQuery.builder()
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDataSource(new TableDataSource(CalciteTests.DATASOURCE1))
.setDimFilter(
and(
in("m1", ImmutableList.of("1", "2"), null),
in("m2", ImmutableList.of("1", "2"), null)
)
)
.setDimensions(
new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE)
)
.setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time")))
.setContext(context)
.build()
),
"j0.",
"((\"m1\" == \"j0.d0\") && (\"m2\" == \"j0.d1\") && (\"__time\" == \"j0.a0\"))",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "m1", "m2")
.context(context)
.build()
),
ImmutableList.of(
new Object[]{946684800000L, 1.0f, 1.0},
new Object[]{946771200000L, 2.0f, 2.0}
)
);
}
}