From 27e0960ca386fa54dc96ac7846273336ba63be02 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 May 2023 15:31:31 -0700 Subject: [PATCH 01/10] Fixes https://github.com/apache/druid/issues/14213 by ensuring we do not convert to and of 2 infilters. We not get selector filters, and them and or those ands --- .../segment/join/JoinableFactoryWrapper.java | 32 ++++++++++++-- .../sql/calcite/CalciteJoinQueryTest.java | 43 +++++++++++++++++++ 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index 134c9da48d77..80a67f487ce6 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -28,12 +28,15 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.filter.FalseFilter; import org.apache.druid.segment.filter.Filters; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -148,6 +151,7 @@ static JoinClauseToFilterConversion convertJoinToFilter( && clause.getCondition().getNonEquiConditions().isEmpty() && clause.getCondition().getEquiConditions().size() > 0) { final List filters = new ArrayList<>(); + final Map> filterMap = new HashMap<>(); int numValues = maxNumFilterValues; // if the right side columns are required, the clause cannot be fully converted boolean joinClauseFullyConverted = requiredColumns.stream().noneMatch(clause::includesColumn); @@ -178,15 +182,35 @@ static JoinClauseToFilterConversion convertJoinToFilter( } numValues -= columnValuesWithUniqueFlag.getColumnValues().size(); - filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); + int c = 0; + for (String val : columnValuesWithUniqueFlag.getColumnValues()) { + Filter f = Filters.toFilter(new SelectorDimFilter(leftColumn, val, null, null)); + List filterList = filterMap.get(c); + if (filterList == null) { + filterList = new ArrayList(); + filterList.add(f); + filterMap.put(c, filterList); + } else { + filterList.add(f); + } + c++; + } + if (clause.getCondition().getEquiConditions().size() == 1) { + filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); + } if (!columnValuesWithUniqueFlag.isAllUnique()) { joinClauseFullyConverted = false; } } - - return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); + if (clause.getCondition().getEquiConditions().size() == 1) { + return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); + } else { + for (Integer i : filterMap.keySet()) { + filters.add(Filters.and(filterMap.get(i))); + } + return new JoinClauseToFilterConversion(Filters.maybeOr(filters).orElse(null), joinClauseFullyConverted); + } } - return new JoinClauseToFilterConversion(null, false); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index da5abc9582a2..fb7e721f8795 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -5530,4 +5530,47 @@ private List sortIfSortBased(final List results, final int.. return results; } } + + @Test + public void testJoinsWithTwoConditions() + { + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "SELECT t1.__time, t1.m1\n" + + "FROM foo t1\n" + + "JOIN (SELECT m1, MAX(__time) as latest_time FROM foo WHERE m1 IN (1,2) GROUP BY m1) t2\n" + + "ON t1.m1 = t2.m1 AND t1.__time = t2.latest_time\n", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + GroupByQuery.builder() + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDataSource(new TableDataSource(CalciteTests.DATASOURCE1)) + .setDimFilter(in("m1", ImmutableList.of("1", "2"), null)) + .setDimensions(new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT)) + .setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time"))) + .setContext(context) + .build() + ), + "j0.", + "((\"m1\" == \"j0.d0\") && (\"__time\" == \"j0.a0\"))", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "m1") + .context(context) + .build() + ), + ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f} + ) + ); + } } From 8155e84e02881460e802bd3a8307d034da9c5324 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 May 2023 17:47:03 -0700 Subject: [PATCH 02/10] Adding another test case --- .../sql/calcite/CalciteJoinQueryTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index fb7e721f8795..1f45aebc8b52 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -5573,4 +5573,55 @@ public void testJoinsWithTwoConditions() ) ); } + + @Test + public void testJoinsWithThreeConditions() + { + Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + testQuery( + "SELECT t1.__time, t1.m1, t1.m2\n" + + "FROM foo t1\n" + + "JOIN (SELECT m1, m2, MAX(__time) as latest_time FROM foo WHERE m1 IN (1,2) AND m2 IN (1,2) GROUP by m1,m2) t2\n" + + "ON t1.m1 = t2.m1 AND t1.m2 = t2.m2 AND t1.__time = t2.latest_time\n", + context, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + GroupByQuery.builder() + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDataSource(new TableDataSource(CalciteTests.DATASOURCE1)) + .setDimFilter( + and( + in("m1", ImmutableList.of("1", "2"), null), + in("m2", ImmutableList.of("1", "2"), null) + ) + ) + .setDimensions( + new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT), + new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE) + ) + .setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time"))) + .setContext(context) + .build() + ), + "j0.", + "((\"m1\" == \"j0.d0\") && (\"m2\" == \"j0.d1\") && (\"__time\" == \"j0.a0\"))", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "m1", "m2") + .context(context) + .build() + ), + ImmutableList.of( + new Object[]{946684800000L, 1.0f, 1.0}, + new Object[]{946771200000L, 2.0f, 2.0} + ) + ); + } } From f8fa4fce5195b83a4d84091811e7ee391205fc98 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 May 2023 19:38:59 -0700 Subject: [PATCH 03/10] Native test for changes --- .../join/JoinableFactoryWrapperTest.java | 87 ++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index 1ff3664aba4a..bd9bf0a623cd 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -35,7 +35,10 @@ import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.AndFilter; import org.apache.druid.segment.filter.FalseFilter; +import org.apache.druid.segment.filter.OrFilter; +import org.apache.druid.segment.filter.SelectorFilter; import org.apache.druid.segment.join.lookup.LookupJoinable; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.IndexedTableJoinable; @@ -75,15 +78,31 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest private static final InlineDataSource INDEXED_TABLE_DS = InlineDataSource.fromIterable( ImmutableList.of( + new Object[]{"El Salvador"}, + new Object[]{"India"}, new Object[]{"Mexico"}, new Object[]{"Norway"}, - new Object[]{"El Salvador"}, - new Object[]{"United States"}, new Object[]{"United States"} + ), RowSignature.builder().add("country", ColumnType.STRING).build() ); + private static final InlineDataSource INDEXED_TABLE_DS_THREE_COLS = InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{"El Salvador", 1, 1.0}, + new Object[]{"India", 2, 2.0}, + new Object[]{"Mexico", 3, 3.0}, + new Object[]{"Norway", 4, 4.0}, + new Object[]{"United States", 5, 5.0} + ), + RowSignature.builder() + .add("country", ColumnType.STRING) + .add("m1", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .build() + ); + private static final InlineDataSource NULL_INDEXED_TABLE_DS = InlineDataSource.fromIterable( ImmutableList.of( new Object[]{null} @@ -99,6 +118,14 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest DateTimes.nowUtc().toString() ); + private static final IndexedTable TEST_INDEXED_TABLE_THREE_COLS = new RowBasedIndexedTable<>( + INDEXED_TABLE_DS_THREE_COLS.getRowsAsList(), + INDEXED_TABLE_DS_THREE_COLS.rowAdapter(), + INDEXED_TABLE_DS_THREE_COLS.getRowSignature(), + ImmutableSet.of("country", "m1", "m2"), + DateTimes.nowUtc().toString() + ); + private static final IndexedTable TEST_NULL_INDEXED_TABLE = new RowBasedIndexedTable<>( NULL_INDEXED_TABLE_DS.getRowsAsList(), NULL_INDEXED_TABLE_DS.rowAdapter(), @@ -625,4 +652,60 @@ public void test_convertJoinsToFilters_dontConvertJoinsDependedOnByPreviousJoins conversion ); } + + @Test + public void test_convertJoinsToPartialFiltersMultipleCondtions() + { + JoinableClause joinableClause = new JoinableClause( + "j.", + new IndexedTableJoinable(TEST_INDEXED_TABLE_THREE_COLS), + JoinType.INNER, + JoinConditionAnalysis.forExpression("x == \"j.country\" && y == \"j.m1\"", "j.", ExprMacroTable.nil()) + ); + final Pair, List> conversion = JoinableFactoryWrapper.convertJoinsToFilters( + ImmutableList.of(joinableClause), + ImmutableSet.of("x", "y"), + Integer.MAX_VALUE + ); + + Assert.assertEquals( + Pair.of( + ImmutableList.of(new OrFilter(ImmutableList.of( + new AndFilter( + ImmutableList.of( + new SelectorFilter("x", "El Salvador"), + new SelectorFilter("y", "1") + ) + ), + new AndFilter( + ImmutableList.of( + new SelectorFilter("x", "India"), + new SelectorFilter("y", "2") + ) + ), + new AndFilter( + ImmutableList.of( + new SelectorFilter("x", "Mexico"), + new SelectorFilter("y", "3") + ) + ), + new AndFilter( + ImmutableList.of( + new SelectorFilter("x", "Norway"), + new SelectorFilter("y", "4") + ) + ), + new AndFilter( + ImmutableList.of( + new SelectorFilter("x", "United States"), + new SelectorFilter("y", "5") + ) + ) + ))), + ImmutableList.of() + // the joinable clause remains intact since we've duplicates in country column + ), + conversion + ); + } } From ab8afad8ed7cb6bd437682ad36db5df629125891 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 May 2023 19:42:29 -0700 Subject: [PATCH 04/10] moving back an unwanted change in older table --- .../apache/druid/segment/join/JoinableFactoryWrapperTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index bd9bf0a623cd..0f7657d7cee0 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -78,10 +78,10 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest private static final InlineDataSource INDEXED_TABLE_DS = InlineDataSource.fromIterable( ImmutableList.of( - new Object[]{"El Salvador"}, - new Object[]{"India"}, new Object[]{"Mexico"}, new Object[]{"Norway"}, + new Object[]{"El Salvador"}, + new Object[]{"United States"}, new Object[]{"United States"} ), From 869b5c0668ca93f795c9ebaf7420e40c934c0e61 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 9 May 2023 20:27:53 -0700 Subject: [PATCH 05/10] Introducing a linked hashset for reading values in order --- .../segment/join/table/IndexedTableJoinable.java | 8 +++++++- .../segment/join/JoinableFactoryWrapperTest.java | 13 ++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index 27933235f97d..ddf894ca953c 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -105,7 +106,7 @@ public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, fina try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) { // Sorted set to encourage "in" filters that result from this method to do dictionary lookups in order. // The hopes are that this will improve locality and therefore improve performance. - final Set allValues = createValuesSet(); + final Set allValues = createOrderedValuesSet(); boolean allUnique = true; for (int i = 0; i < table.numRows(); i++) { @@ -204,4 +205,9 @@ private static Set createValuesSet() { return new TreeSet<>(Comparators.naturalNullsFirst()); } + + private static Set createOrderedValuesSet() + { + return new LinkedHashSet<>(); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index 0f7657d7cee0..a8f8a68fafb4 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -83,7 +83,6 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest new Object[]{"El Salvador"}, new Object[]{"United States"}, new Object[]{"United States"} - ), RowSignature.builder().add("country", ColumnType.STRING).build() ); @@ -91,10 +90,10 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest private static final InlineDataSource INDEXED_TABLE_DS_THREE_COLS = InlineDataSource.fromIterable( ImmutableList.of( new Object[]{"El Salvador", 1, 1.0}, - new Object[]{"India", 2, 2.0}, - new Object[]{"Mexico", 3, 3.0}, + new Object[]{"Mexico", 2, 2.0}, + new Object[]{"United States", 3, 3.0}, new Object[]{"Norway", 4, 4.0}, - new Object[]{"United States", 5, 5.0} + new Object[]{"India", 5, 5.0} ), RowSignature.builder() .add("country", ColumnType.STRING) @@ -679,13 +678,13 @@ public void test_convertJoinsToPartialFiltersMultipleCondtions() ), new AndFilter( ImmutableList.of( - new SelectorFilter("x", "India"), + new SelectorFilter("x", "Mexico"), new SelectorFilter("y", "2") ) ), new AndFilter( ImmutableList.of( - new SelectorFilter("x", "Mexico"), + new SelectorFilter("x", "United States"), new SelectorFilter("y", "3") ) ), @@ -697,7 +696,7 @@ public void test_convertJoinsToPartialFiltersMultipleCondtions() ), new AndFilter( ImmutableList.of( - new SelectorFilter("x", "United States"), + new SelectorFilter("x", "India"), new SelectorFilter("y", "5") ) ) From e8941e613653cc44a8d5ca37ffbe874859b2bb0f Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 10 May 2023 10:06:48 -0700 Subject: [PATCH 06/10] Spotbug fix moving from keyset to entryset for iterating over the map --- .../segment/join/JoinableFactoryWrapper.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index 80a67f487ce6..c1eff804e214 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -123,7 +123,7 @@ public static Pair, List> convertJoinsToFilters( } /** - * Converts a join clause into an "in" filter if possible. + * Converts a join clause into appropriate filter(s) if possible. *

* The requirements are: *

@@ -151,6 +151,7 @@ static JoinClauseToFilterConversion convertJoinToFilter( && clause.getCondition().getNonEquiConditions().isEmpty() && clause.getCondition().getEquiConditions().size() > 0) { final List filters = new ArrayList<>(); + // Creating a map that will store for each index the filters that needs to be AND ed final Map> filterMap = new HashMap<>(); int numValues = maxNumFilterValues; // if the right side columns are required, the clause cannot be fully converted @@ -182,16 +183,18 @@ static JoinClauseToFilterConversion convertJoinToFilter( } numValues -= columnValuesWithUniqueFlag.getColumnValues().size(); + // For each column value which are received in order we increment the index + // and add it in the appropriate index in the map int c = 0; for (String val : columnValuesWithUniqueFlag.getColumnValues()) { - Filter f = Filters.toFilter(new SelectorDimFilter(leftColumn, val, null, null)); + final Filter currentSelFilter = Filters.toFilter(new SelectorDimFilter(leftColumn, val, null, null)); List filterList = filterMap.get(c); if (filterList == null) { filterList = new ArrayList(); - filterList.add(f); + filterList.add(currentSelFilter); filterMap.put(c, filterList); } else { - filterList.add(f); + filterList.add(currentSelFilter); } c++; } @@ -205,8 +208,8 @@ static JoinClauseToFilterConversion convertJoinToFilter( if (clause.getCondition().getEquiConditions().size() == 1) { return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); } else { - for (Integer i : filterMap.keySet()) { - filters.add(Filters.and(filterMap.get(i))); + for (Map.Entry> entry : filterMap.entrySet()) { + filters.add(Filters.and(entry.getValue())); } return new JoinClauseToFilterConversion(Filters.maybeOr(filters).orElse(null), joinClauseFullyConverted); } From 017fc876c1ae55376cc508bd5e515bd7c0b2c668 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 10 May 2023 11:56:55 -0700 Subject: [PATCH 07/10] Addressing review comments, now join to filter optimization triggers only when there is a single equijoin --- .../segment/join/JoinableFactoryWrapper.java | 37 ++++------------ .../join/table/IndexedTableJoinable.java | 11 ++--- .../join/JoinableFactoryWrapperTest.java | 43 +++---------------- 3 files changed, 16 insertions(+), 75 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index c1eff804e214..192b30d941fe 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -28,15 +28,12 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.InDimFilter; -import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.filter.FalseFilter; import org.apache.druid.segment.filter.Filters; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -147,12 +144,16 @@ static JoinClauseToFilterConversion convertJoinToFilter( final Set rightPrefixes ) { + // This optimization kicks in when there is exactly 1 equijoin + // The reason being that getNonNullColumnValues uses a TreeSet for handling duplicates + // and does not return values in order. + // In case TreeSet if replaced by a LinkedHashSet, duplicates are not handled correctly + // and resulting ordering is improper. + // Considering these joins are converted to filters only when there is 1 equijoin if (clause.getJoinType() == JoinType.INNER && clause.getCondition().getNonEquiConditions().isEmpty() - && clause.getCondition().getEquiConditions().size() > 0) { + && clause.getCondition().getEquiConditions().size() == 1) { final List filters = new ArrayList<>(); - // Creating a map that will store for each index the filters that needs to be AND ed - final Map> filterMap = new HashMap<>(); int numValues = maxNumFilterValues; // if the right side columns are required, the clause cannot be fully converted boolean joinClauseFullyConverted = requiredColumns.stream().noneMatch(clause::includesColumn); @@ -183,21 +184,6 @@ static JoinClauseToFilterConversion convertJoinToFilter( } numValues -= columnValuesWithUniqueFlag.getColumnValues().size(); - // For each column value which are received in order we increment the index - // and add it in the appropriate index in the map - int c = 0; - for (String val : columnValuesWithUniqueFlag.getColumnValues()) { - final Filter currentSelFilter = Filters.toFilter(new SelectorDimFilter(leftColumn, val, null, null)); - List filterList = filterMap.get(c); - if (filterList == null) { - filterList = new ArrayList(); - filterList.add(currentSelFilter); - filterMap.put(c, filterList); - } else { - filterList.add(currentSelFilter); - } - c++; - } if (clause.getCondition().getEquiConditions().size() == 1) { filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); } @@ -205,14 +191,7 @@ static JoinClauseToFilterConversion convertJoinToFilter( joinClauseFullyConverted = false; } } - if (clause.getCondition().getEquiConditions().size() == 1) { - return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); - } else { - for (Map.Entry> entry : filterMap.entrySet()) { - filters.add(Filters.and(entry.getValue())); - } - return new JoinClauseToFilterConversion(Filters.maybeOr(filters).orElse(null), joinClauseFullyConverted); - } + return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); } return new JoinClauseToFilterConversion(null, false); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index ddf894ca953c..beb0c9e5b667 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; -import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -104,9 +103,10 @@ public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, fina } try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) { - // Sorted set to encourage "in" filters that result from this method to do dictionary lookups in order. + // Use a SortedSet so InDimFilter doesn't need to create its own + // and to encourage "in" filters that result from this method to do dictionary lookups in order. // The hopes are that this will improve locality and therefore improve performance. - final Set allValues = createOrderedValuesSet(); + final Set allValues = createValuesSet(); boolean allUnique = true; for (int i = 0; i < table.numRows(); i++) { @@ -205,9 +205,4 @@ private static Set createValuesSet() { return new TreeSet<>(Comparators.naturalNullsFirst()); } - - private static Set createOrderedValuesSet() - { - return new LinkedHashSet<>(); - } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index a8f8a68fafb4..8ff6d8461f8f 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -35,10 +35,7 @@ import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.filter.AndFilter; import org.apache.druid.segment.filter.FalseFilter; -import org.apache.druid.segment.filter.OrFilter; -import org.apache.druid.segment.filter.SelectorFilter; import org.apache.druid.segment.join.lookup.LookupJoinable; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.IndexedTableJoinable; @@ -93,7 +90,8 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest new Object[]{"Mexico", 2, 2.0}, new Object[]{"United States", 3, 3.0}, new Object[]{"Norway", 4, 4.0}, - new Object[]{"India", 5, 5.0} + new Object[]{"India", 5, 5.0}, + new Object[]{"United States", 6, 3.0} ), RowSignature.builder() .add("country", ColumnType.STRING) @@ -667,42 +665,11 @@ public void test_convertJoinsToPartialFiltersMultipleCondtions() Integer.MAX_VALUE ); + // Optimization does not kick in as there are > 1 equijoins Assert.assertEquals( Pair.of( - ImmutableList.of(new OrFilter(ImmutableList.of( - new AndFilter( - ImmutableList.of( - new SelectorFilter("x", "El Salvador"), - new SelectorFilter("y", "1") - ) - ), - new AndFilter( - ImmutableList.of( - new SelectorFilter("x", "Mexico"), - new SelectorFilter("y", "2") - ) - ), - new AndFilter( - ImmutableList.of( - new SelectorFilter("x", "United States"), - new SelectorFilter("y", "3") - ) - ), - new AndFilter( - ImmutableList.of( - new SelectorFilter("x", "Norway"), - new SelectorFilter("y", "4") - ) - ), - new AndFilter( - ImmutableList.of( - new SelectorFilter("x", "India"), - new SelectorFilter("y", "5") - ) - ) - ))), - ImmutableList.of() - // the joinable clause remains intact since we've duplicates in country column + ImmutableList.of(), + ImmutableList.of(joinableClause) ), conversion ); From 80f9efbe6807f954c09f90c7b80e86ebe18979ef Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 10 May 2023 11:59:47 -0700 Subject: [PATCH 08/10] Removing a redundant code snippet --- .../org/apache/druid/segment/join/JoinableFactoryWrapper.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index 192b30d941fe..cd2e5ca92cc5 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -184,9 +184,7 @@ static JoinClauseToFilterConversion convertJoinToFilter( } numValues -= columnValuesWithUniqueFlag.getColumnValues().size(); - if (clause.getCondition().getEquiConditions().size() == 1) { - filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); - } + filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); if (!columnValuesWithUniqueFlag.isAllUnique()) { joinClauseFullyConverted = false; } From 2c67ab08776a150bce56813be38fb8e947c70feb Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 10 May 2023 19:44:46 -0700 Subject: [PATCH 09/10] Addressing review comments --- .../segment/join/JoinableFactoryWrapper.java | 65 +++++++++---------- .../join/table/IndexedTableJoinable.java | 2 - 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index cd2e5ca92cc5..d3a1ec7b93a5 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -25,15 +25,17 @@ import com.google.common.collect.Multiset; import com.google.common.collect.Sets; import com.google.inject.Inject; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.segment.filter.FalseFilter; -import org.apache.druid.segment.filter.Filters; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -145,51 +147,42 @@ static JoinClauseToFilterConversion convertJoinToFilter( ) { // This optimization kicks in when there is exactly 1 equijoin - // The reason being that getNonNullColumnValues uses a TreeSet for handling duplicates - // and does not return values in order. - // In case TreeSet if replaced by a LinkedHashSet, duplicates are not handled correctly - // and resulting ordering is improper. - // Considering these joins are converted to filters only when there is 1 equijoin + final List equiConditions = clause.getCondition().getEquiConditions(); if (clause.getJoinType() == JoinType.INNER && clause.getCondition().getNonEquiConditions().isEmpty() - && clause.getCondition().getEquiConditions().size() == 1) { - final List filters = new ArrayList<>(); - int numValues = maxNumFilterValues; + && equiConditions.size() == 1) { // if the right side columns are required, the clause cannot be fully converted boolean joinClauseFullyConverted = requiredColumns.stream().noneMatch(clause::includesColumn); + final Equality condition = CollectionUtils.getOnlyElement( + equiConditions, + xse -> new IAE("Expected only one equi condition") + ); - for (final Equality condition : clause.getCondition().getEquiConditions()) { - final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier(); + final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier(); - if (leftColumn == null) { - return new JoinClauseToFilterConversion(null, false); - } - - // don't add a filter on any right side table columns. only filter on left base table is supported as of now. - if (rightPrefixes.stream().anyMatch(leftColumn::startsWith)) { - joinClauseFullyConverted = false; - continue; - } + if (leftColumn == null) { + return new JoinClauseToFilterConversion(null, false); + } - Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag = - clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), numValues); - // For an empty values set, isAllUnique flag will be true only if the column had no non-null values. - if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) { - if (columnValuesWithUniqueFlag.isAllUnique()) { - return new JoinClauseToFilterConversion(FalseFilter.instance(), true); - } else { - joinClauseFullyConverted = false; - } - continue; - } + // don't add a filter on any right side table columns. only filter on left base table is supported as of now. + if (rightPrefixes.stream().anyMatch(leftColumn::startsWith)) { + return new JoinClauseToFilterConversion(null, false); + } - numValues -= columnValuesWithUniqueFlag.getColumnValues().size(); - filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()))); - if (!columnValuesWithUniqueFlag.isAllUnique()) { - joinClauseFullyConverted = false; + Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag = + clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), maxNumFilterValues); + // For an empty values set, isAllUnique flag will be true only if the column had no non-null values. + if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) { + if (columnValuesWithUniqueFlag.isAllUnique()) { + return new JoinClauseToFilterConversion(FalseFilter.instance(), true); } + return new JoinClauseToFilterConversion(null, false); + } + final Filter onlyFilter = new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues()); + if (!columnValuesWithUniqueFlag.isAllUnique()) { + joinClauseFullyConverted = false; } - return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted); + return new JoinClauseToFilterConversion(Optional.of(onlyFilter).orElse(null), joinClauseFullyConverted); } return new JoinClauseToFilterConversion(null, false); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index beb0c9e5b667..cf7ced874360 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -104,8 +104,6 @@ public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, fina try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) { // Use a SortedSet so InDimFilter doesn't need to create its own - // and to encourage "in" filters that result from this method to do dictionary lookups in order. - // The hopes are that this will improve locality and therefore improve performance. final Set allValues = createValuesSet(); boolean allUnique = true; From b150ad9aabd28a69451bac81b8f0d895563cd220 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 10 May 2023 20:06:21 -0700 Subject: [PATCH 10/10] fixed a static check for using optional --- .../org/apache/druid/segment/join/JoinableFactoryWrapper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index d3a1ec7b93a5..94c98f8ae2b8 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -182,7 +181,7 @@ static JoinClauseToFilterConversion convertJoinToFilter( if (!columnValuesWithUniqueFlag.isAllUnique()) { joinClauseFullyConverted = false; } - return new JoinClauseToFilterConversion(Optional.of(onlyFilter).orElse(null), joinClauseFullyConverted); + return new JoinClauseToFilterConversion(onlyFilter, joinClauseFullyConverted); } return new JoinClauseToFilterConversion(null, false); }