Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
Expand Down Expand Up @@ -141,19 +140,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupStringKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
JoinFilterPreAnalysisGroup preAnalysisGroupLookupStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);

hashJoinLookupStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
preAnalysisGroupLookupStringKey
);

List<JoinableClause> joinableClausesLookupLongKey = ImmutableList.of(
Expand All @@ -168,19 +168,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupLongKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0

JoinFilterPreAnalysisGroup preAnalysisGroupLookupLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
preAnalysisGroupLookupLongKey
);

List<JoinableClause> joinableClausesIndexedTableStringKey = ImmutableList.of(
Expand All @@ -195,19 +196,20 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableStringKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0

JoinFilterPreAnalysisGroup preAnalysisGroupIndexedStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
preAnalysisGroupIndexedStringKey
);

List<JoinableClause> joinableClausesIndexedTableLonggKey = ImmutableList.of(
Expand All @@ -222,19 +224,19 @@ public void setup() throws IOException
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableLonggKey),
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
JoinFilterPreAnalysisGroup preAnalysisGroupIndexedLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
preAnalysisGroupIndexedLongKey
);

final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ private List<List<String>> verifySubtotalsSpec(
return subtotalsSpec;
}

@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
return maxSegmentPartitionsOrderedInMemory;
}

@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public String getType()
return Query.TIMESERIES;
}

@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public String getType()
return TOPN;
}

@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

Expand All @@ -44,23 +44,24 @@ public class HashJoinSegment implements SegmentReference
{
private final SegmentReference baseSegment;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
private final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup;

/**
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis}
* @param joinFilterPreAnalysisGroup Pre-analysis group that holds all of the JoinFilterPreAnalysis results within
* the scope of a query
*/
public HashJoinSegment(
SegmentReference baseSegment,
List<JoinableClause> clauses,
JoinFilterPreAnalysis joinFilterPreAnalysis
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup
)
{
this.baseSegment = baseSegment;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
this.joinFilterPreAnalysisGroup = joinFilterPreAnalysisGroup;

// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
Expand Down Expand Up @@ -93,7 +94,7 @@ public QueryableIndex asQueryableIndex()
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysis);
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysisGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -38,6 +37,7 @@
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterSplit;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -47,30 +47,30 @@
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
private final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup;

/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysisGroup Pre-analysis group that holds all of the JoinFilterPreAnalysis results within
* the scope of a query
*/
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses,
final JoinFilterPreAnalysis joinFilterPreAnalysis
final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
this.joinFilterPreAnalysisGroup = joinFilterPreAnalysisGroup;
}

@Override
Expand Down Expand Up @@ -209,13 +209,16 @@ public Sequence<Cursor> makeCursors(
@Nullable final QueryMetrics<?> queryMetrics
)
{
if (!Objects.equals(joinFilterPreAnalysis.getOriginalFilter(), filter)) {
throw new ISE(
"Filter provided to cursor [%s] does not match join pre-analysis filter [%s]",
JoinFilterPreAnalysis jfpa;
if (joinFilterPreAnalysisGroup.isSingleLevelMode()) {
jfpa = joinFilterPreAnalysisGroup.getPreAnalysisForSingleLevelMode();
} else {
jfpa = joinFilterPreAnalysisGroup.getAnalysis(
filter,
joinFilterPreAnalysis.getOriginalFilter()
virtualColumns
);
}

final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();

Expand All @@ -225,7 +228,7 @@ public Sequence<Cursor> makeCursors(
postJoinVirtualColumns
);

JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(jfpa);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());

// Soon, we will need a way to push filters past a join when possible. This could potentially be done right here
Expand Down
Loading