Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
Expand Down Expand Up @@ -140,19 +139,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupStringKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
JoinFilterPreAnalysisGroup preAnalysisGroupLookupStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0,
false
)
);

hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment,
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
preAnalysisGroupLookupStringKey
);

List<JoinableClause> joinableClausesLookupLongKey = ImmutableList.of(
Expand All @@ -167,19 +167,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupLongKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0

JoinFilterPreAnalysisGroup preAnalysisGroupLookupLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0,
false
)
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment,
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
preAnalysisGroupLookupLongKey
);

List<JoinableClause> joinableClausesIndexedTableStringKey = ImmutableList.of(
Expand All @@ -194,19 +195,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableStringKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0

JoinFilterPreAnalysisGroup preAnalysisGroupIndexedStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0,
false
)
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment,
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
preAnalysisGroupIndexedStringKey
);

List<JoinableClause> joinableClausesIndexedTableLonggKey = ImmutableList.of(
Expand All @@ -221,19 +223,19 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableLonggKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
JoinFilterPreAnalysisGroup preAnalysisGroupIndexedLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0,
false
)
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment,
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
preAnalysisGroupIndexedLongKey
);

final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();
Expand Down
22 changes: 22 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class QueryContexts
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
public static final String JOIN_FILTER_REWRITE_USE_OLD_REWRITE_MODE = "joinFilterRewriteUseOldRewriteMode";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";

public static final boolean DEFAULT_BY_SEGMENT = false;
Expand All @@ -68,6 +69,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final boolean DEFAULT_JOIN_FILTER_REWRITE_USE_OLD_REWRITE_MODE = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_USE_FILTER_CNF = false;

Expand Down Expand Up @@ -264,6 +266,26 @@ public static <T> boolean getEnableJoinFilterRewrite(Query<T> query)
return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
}

/**
* This is an undocumented option provided as a transition tool:
*
* The join filter rewrites originally performed the pre-analysis phase prior to any per-segment processing,
* analyzing only the filter in the top-level of the query.
*
* This did not work for nested queries (see https://github.com/apache/druid/pull/9978), so the rewrite pre-analysis
* was moved into the cursor creation of the {@link org.apache.druid.segment.join.HashJoinSegmentStorageAdapter}.
* This design requires synchronization across multiple segment processing threads; the old rewrite mode
* is kept temporarily available in case issues arise with the new mode, and the user does not run queries with the
* affected nested shape.
*/
public static <T> boolean getUseJoinFilterRewriteOldRewriteMode(Query<T> query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to detect this and use the different modes automatically since it sounds like the old mode is perhaps better if there are no subqueries involved?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that could be worth looking into later on

{
return parseBoolean(
query,
JOIN_FILTER_REWRITE_USE_OLD_REWRITE_MODE,
DEFAULT_JOIN_FILTER_REWRITE_USE_OLD_REWRITE_MODE
);
}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.apache.druid.query.filter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;

Expand All @@ -43,6 +45,9 @@ public class AndDimFilter implements DimFilter

private final List<DimFilter> fields;

@JsonIgnore
private Integer fieldsHashCode;

@JsonCreator
public AndDimFilter(
@JsonProperty("fields") List<DimFilter> fields
Expand All @@ -67,7 +72,7 @@ public List<DimFilter> getFields()
@Override
public byte[] getCacheKey()
{
return DimFilterUtils.computeCacheKey(DimFilterUtils.AND_CACHE_ID, fields);
return new CacheKeyBuilder(DimFilterUtils.AND_CACHE_ID).appendInt(hashCode()).build();
}

@Override
Expand Down Expand Up @@ -142,7 +147,10 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return fields != null ? fields.hashCode() : 0;
if (fieldsHashCode == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be threadsafe? Might want to put behind a Supplier.memoize instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this and elsewhere to use Supplier.memoize

fieldsHashCode = fields != null ? fields.hashCode() : 0;
}
return fieldsHashCode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.RangeSet;
import org.apache.druid.timeline.partition.ShardSpec;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -57,29 +56,6 @@ public class DimFilterUtils

public static final byte STRING_SEPARATOR = (byte) 0xFF;

static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
{
if (filters.size() == 1) {
return filters.get(0).getCacheKey();
}

byte[][] cacheKeys = new byte[filters.size()][];
int totalSize = 0;
int index = 0;
for (DimFilter field : filters) {
cacheKeys[index] = field.getCacheKey();
totalSize += cacheKeys[index].length;
++index;
}

ByteBuffer retVal = ByteBuffer.allocate(1 + totalSize);
retVal.put(cacheIdKey);
for (byte[] cacheKey : cacheKeys) {
retVal.put(cacheKey);
}
return retVal.array();
}

/**
* Filter the given iterable of objects by removing any object whose ShardSpec, obtained from the converter function,
* does not fit in the RangeSet of the dimFilter {@link DimFilter#getDimensionRangeSet(String)}. The returned set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class InDimFilter implements DimFilter
@JsonIgnore
private byte[] cacheKey;

@JsonIgnore
private final Supplier<Integer> valuesHashCode;

@JsonCreator
public InDimFilter(
@JsonProperty("dimension") String dimension,
Expand All @@ -107,6 +110,7 @@ public InDimFilter(
this.longPredicateSupplier = getLongPredicateSupplier();
this.floatPredicateSupplier = getFloatPredicateSupplier();
this.doublePredicateSupplier = getDoublePredicateSupplier();
this.valuesHashCode = Suppliers.memoize(() -> computeValuesHashCode(values));
}

/**
Expand Down Expand Up @@ -150,22 +154,12 @@ public FilterTuning getFilterTuning()
public byte[] getCacheKey()
{
if (cacheKey == null) {
final List<String> sortedValues = new ArrayList<>(values);
sortedValues.sort(Comparator.nullsFirst(Ordering.natural()));
final Hasher hasher = Hashing.sha256().newHasher();
for (String v : sortedValues) {
if (v == null) {
hasher.putInt(0);
} else {
hasher.putString(v, StandardCharsets.UTF_8);
}
}
cacheKey = new CacheKeyBuilder(DimFilterUtils.IN_CACHE_ID)
.appendString(dimension)
.appendByte(DimFilterUtils.STRING_SEPARATOR)
.appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey())
.appendByte(DimFilterUtils.STRING_SEPARATOR)
.appendByteArray(hasher.hash().asBytes())
.appendInt(valuesHashCode.get())
.build();
}
return cacheKey;
Expand Down Expand Up @@ -237,7 +231,8 @@ public Filter toFilter()
floatPredicateSupplier,
doublePredicateSupplier,
extractionFn,
filterTuning
filterTuning,
valuesHashCode
);
}

Expand Down Expand Up @@ -290,8 +285,9 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}

InDimFilter that = (InDimFilter) o;
return values.equals(that.values) &&
return valuesHashCode.get().equals(that.valuesHashCode.get()) &&
dimension.equals(that.dimension) &&
Objects.equals(extractionFn, that.extractionFn) &&
Objects.equals(filterTuning, that.filterTuning);
Expand All @@ -300,7 +296,22 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(values, dimension, extractionFn, filterTuning);
return Objects.hash(valuesHashCode.get(), dimension, extractionFn, filterTuning);
}

public static int computeValuesHashCode(Set<String> values)
{
final List<String> sortedValues = new ArrayList<>(values);
sortedValues.sort(Comparator.nullsFirst(Ordering.natural()));
final Hasher hasher = Hashing.sha256().newHasher();
for (String v : sortedValues) {
if (v == null) {
hasher.putInt(0);
} else {
hasher.putString(v, StandardCharsets.UTF_8);
}
}
return hasher.hash().asInt();
}

private DruidLongPredicate createLongPredicate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.apache.druid.query.filter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;

Expand All @@ -44,6 +46,9 @@ public class OrDimFilter implements DimFilter

private final List<DimFilter> fields;

@JsonIgnore
private Integer fieldsHashCode;

@JsonCreator
public OrDimFilter(@JsonProperty("fields") List<DimFilter> fields)
{
Expand Down Expand Up @@ -75,7 +80,7 @@ public List<DimFilter> getFields()
@Override
public byte[] getCacheKey()
{
return DimFilterUtils.computeCacheKey(DimFilterUtils.OR_CACHE_ID, fields);
return new CacheKeyBuilder(DimFilterUtils.OR_CACHE_ID).appendInt(hashCode()).build();
}

@Override
Expand Down Expand Up @@ -148,7 +153,10 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return fields != null ? fields.hashCode() : 0;
if (fieldsHashCode == null) {
fieldsHashCode = fields != null ? fields.hashCode() : 0;
}
return fieldsHashCode;
}

@Override
Expand Down
Loading