Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand All @@ -49,6 +48,8 @@
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
Expand All @@ -69,6 +70,7 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -125,76 +127,112 @@ public void setup() throws IOException

baseSegment = new QueryableIndexSegment(index, SegmentId.dummy("join"));

List<JoinableClause> joinableClausesLookupStringKey = ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesLookupStringKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);
hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
);

List<JoinableClause> joinableClausesLookupLongKey = ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
ExprMacroTable.nil()
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
)
);
JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesLookupLongKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);

hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
);

List<JoinableClause> joinableClausesIndexedTableStringKey = ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix),
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryNumberToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
ExprMacroTable.nil()
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesIndexedTableStringKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);

hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
);

List<JoinableClause> joinableClausesIndexedTableLonggKey = ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%scountryNumber\"", prefix),
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix),
prefix,
ExprMacroTable.nil()
)
ExprMacroTable.nil()
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesIndexedTableLonggKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);

hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%scountryNumber\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
);

final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();
Expand Down
5 changes: 5 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Interval;
Expand Down Expand Up @@ -158,4 +159,8 @@ default Query<T> withLane(String lane)
return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane));
}

default VirtualColumns getVirtualColumns()
{
return VirtualColumns.EMPTY;
}
}
24 changes: 21 additions & 3 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class QueryContexts
public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTOR_SIZE_KEY = "vectorSize";
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
public static final String JOIN_FILTER_REWRITE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";

public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
Expand All @@ -61,7 +63,9 @@ public class QueryContexts
public static final long NO_TIMEOUT = 0;
public static final boolean DEFAULT_ENABLE_PARALLEL_MERGE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = false;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY = 10000;

@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
Expand Down Expand Up @@ -227,6 +231,19 @@ public static <T> int getParallelMergeParallelism(Query<T> query, int defaultVal
{
return parseInt(query, BROKER_PARALLELISM, defaultValue);
}
public static <T> boolean getEnableJoinFilterRewriteValueColumnFilters(Query<T> query)
{
return parseBoolean(
query,
JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY,
DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS
);
}

public static <T> long getJoinFilterRewriteMaxSize(Query<T> query)
{
return parseLong(query, JOIN_FILTER_REWRITE_MAX_SIZE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY);
}

public static <T> boolean getEnableJoinFilterPushDown(Query<T> query)
{
Expand All @@ -235,9 +252,10 @@ public static <T> boolean getEnableJoinFilterPushDown(Query<T> query)

public static <T> boolean getEnableJoinFilterRewrite(Query<T> query)
{
return parseBoolean(query, JOIN_FILTER_REWRITE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
}


public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ private List<List<String>> verifySubtotalsSpec(
return subtotalsSpec;
}

@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
return maxSegmentPartitionsOrderedInMemory;
}

@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public String getType()
return Query.TIMESERIES;
}

@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public String getType()
return TOPN;
}

@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

Expand All @@ -40,27 +41,23 @@ public class HashJoinSegment extends AbstractSegment
{
private final Segment baseSegment;
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;

/**
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param enableFilterPushDown Whether to enable filter push down optimizations to the base segment. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterPushDown(query)}.
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis}
*/
public HashJoinSegment(
Segment baseSegment,
List<JoinableClause> clauses,
boolean enableFilterPushDown,
boolean enableFilterRewrite
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
this.baseSegment = baseSegment;
this.clauses = clauses;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;

// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
Expand Down Expand Up @@ -93,7 +90,7 @@ public QueryableIndex asQueryableIndex()
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, enableFilterPushDown, enableFilterRewrite);
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysis);
}

@Override
Expand Down
Loading