Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void setup()
hashJoinSegment = closer.register(
new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
clauses,
preAnalysis
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public void setup() throws IOException

hashJoinLookupStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
);
Expand Down Expand Up @@ -194,6 +195,7 @@ public void setup() throws IOException

hashJoinLookupLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
);
Expand Down Expand Up @@ -228,6 +230,7 @@ public void setup() throws IOException

hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
joinableClausesIndexedTableStringKey,
preAnalysisIndexedStringKey
);
Expand Down Expand Up @@ -262,6 +265,7 @@ public void setup() throws IOException

hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
null,
joinableClausesIndexedTableLongKey,
preAnalysisIndexedLongKey
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinPrefixUtils;
import org.apache.druid.segment.join.JoinType;

import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -58,20 +60,30 @@ public class JoinDataSource implements DataSource
private final String rightPrefix;
private final JoinConditionAnalysis conditionAnalysis;
private final JoinType joinType;
// An optional filter on the left side if left is direct table access
@Nullable
private final DimFilter leftFilter;

private JoinDataSource(
DataSource left,
DataSource right,
String rightPrefix,
JoinConditionAnalysis conditionAnalysis,
JoinType joinType
JoinType joinType,
@Nullable DimFilter leftFilter
)
{
this.left = Preconditions.checkNotNull(left, "left");
this.right = Preconditions.checkNotNull(right, "right");
this.rightPrefix = JoinPrefixUtils.validatePrefix(rightPrefix);
this.conditionAnalysis = Preconditions.checkNotNull(conditionAnalysis, "conditionAnalysis");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
//TODO: Add support for union data sources
Preconditions.checkArgument(
leftFilter == null || left instanceof TableDataSource,
"left filter is only supported if left data source is direct table access"
);
this.leftFilter = leftFilter;
}

/**
Expand All @@ -84,6 +96,7 @@ public static JoinDataSource create(
@JsonProperty("rightPrefix") String rightPrefix,
@JsonProperty("condition") String condition,
@JsonProperty("joinType") JoinType joinType,
@Nullable @JsonProperty("leftFilter") DimFilter leftFilter,
@JacksonInject ExprMacroTable macroTable
)
{
Expand All @@ -96,7 +109,8 @@ public static JoinDataSource create(
StringUtils.nullToEmptyNonDruidDataString(rightPrefix),
macroTable
),
joinType
joinType,
leftFilter
);
}

Expand All @@ -108,10 +122,26 @@ public static JoinDataSource create(
final DataSource right,
final String rightPrefix,
final JoinConditionAnalysis conditionAnalysis,
final JoinType joinType
final JoinType joinType,
final DimFilter leftFilter
)
{
return new JoinDataSource(left, right, rightPrefix, conditionAnalysis, joinType, leftFilter);
}

/**
* Create a join dataSource from an existing {@link JoinConditionAnalysis}.
Comment thread
abhishekagarwal87 marked this conversation as resolved.
*/
public static JoinDataSource create(
final DataSource left,
final DataSource right,
final String rightPrefix,
final String condition,
final JoinType joinType,
final ExprMacroTable macroTable
)
{
return new JoinDataSource(left, right, rightPrefix, conditionAnalysis, joinType);
return create(left, right, rightPrefix, condition, joinType, null, macroTable);
}

@Override
Expand Down Expand Up @@ -158,6 +188,13 @@ public JoinType getJoinType()
return joinType;
}

@JsonProperty
@Nullable
public DimFilter getLeftFilter()
{
return leftFilter;
}

@Override
public List<DataSource> getChildren()
{
Expand All @@ -171,7 +208,14 @@ public DataSource withChildren(List<DataSource> children)
throw new IAE("Expected [2] children, got [%d]", children.size());
}

return new JoinDataSource(children.get(0), children.get(1), rightPrefix, conditionAnalysis, joinType);
return new JoinDataSource(
children.get(0),
children.get(1),
rightPrefix,
conditionAnalysis,
joinType,
leftFilter
);
}

@Override
Expand Down Expand Up @@ -206,13 +250,14 @@ public boolean equals(Object o)
Objects.equals(right, that.right) &&
Objects.equals(rightPrefix, that.rightPrefix) &&
Objects.equals(conditionAnalysis, that.conditionAnalysis) &&
Objects.equals(leftFilter, that.leftFilter) &&
joinType == that.joinType;
}

@Override
public int hashCode()
{
return Objects.hash(left, right, rightPrefix, conditionAnalysis, joinType);
return Objects.hash(left, right, rightPrefix, conditionAnalysis, joinType, leftFilter);
}

@Override
Expand All @@ -224,6 +269,7 @@ public String toString()
", rightPrefix='" + rightPrefix + '\'' +
", condition=" + conditionAnalysis +
", joinType=" + joinType +
", leftFilter=" + leftFilter +
'}';
}
}
6 changes: 5 additions & 1 deletion processing/src/main/java/org/apache/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
Expand Down Expand Up @@ -192,15 +193,18 @@ public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSo
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());

DataSource current = newBaseDataSource;
DimFilter joinBaseFilter = analysis.getJoinBaseTableFilter().orElse(null);

for (final PreJoinableClause clause : analysis.getPreJoinableClauses()) {
current = JoinDataSource.create(
current,
clause.getDataSource(),
clause.getPrefix(),
clause.getCondition(),
clause.getJoinType()
clause.getJoinType(),
joinBaseFilter
);
joinBaseFilter = null;
}

retVal = query.withDataSource(current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -80,12 +81,15 @@ public class DataSourceAnalysis
private final DataSource baseDataSource;
@Nullable
private final Query<?> baseQuery;
@Nullable
private final DimFilter joinBaseTableFilter;
private final List<PreJoinableClause> preJoinableClauses;

private DataSourceAnalysis(
DataSource dataSource,
DataSource baseDataSource,
@Nullable Query<?> baseQuery,
@Nullable DimFilter joinBaseTableFilter,
List<PreJoinableClause> preJoinableClauses
)
{
Expand All @@ -98,6 +102,7 @@ private DataSourceAnalysis(
this.dataSource = dataSource;
this.baseDataSource = baseDataSource;
this.baseQuery = baseQuery;
this.joinBaseTableFilter = joinBaseTableFilter;
this.preJoinableClauses = preJoinableClauses;
}

Expand All @@ -121,10 +126,10 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource)
}

if (current instanceof JoinDataSource) {
final Pair<DataSource, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.lhs, baseQuery, flattened.rhs);
final Pair<Pair<DataSource, DimFilter>, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.lhs.lhs, baseQuery, flattened.lhs.rhs, flattened.rhs);
} else {
return new DataSourceAnalysis(dataSource, current, baseQuery, Collections.emptyList());
return new DataSourceAnalysis(dataSource, current, baseQuery, null, Collections.emptyList());
}
}

Expand All @@ -134,14 +139,19 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource)
*
* @throws IllegalArgumentException if dataSource cannot be fully flattened.
*/
private static Pair<DataSource, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
private static Pair<Pair<DataSource, DimFilter>, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
{
DataSource current = dataSource;
DimFilter currentDimFilter = null;
final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();

while (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
if (currentDimFilter != null) {
throw new IAE("Left filters are only allowed when left child is direct table access");
}
currentDimFilter = joinDataSource.getLeftFilter();
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
Expand All @@ -156,7 +166,7 @@ private static Pair<DataSource, List<PreJoinableClause>> flattenJoin(final JoinD
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);

return Pair.of(current, preJoinableClauses);
return Pair.of(Pair.of(current, currentDimFilter), preJoinableClauses);
Comment thread
abhishekagarwal87 marked this conversation as resolved.
}

/**
Expand Down Expand Up @@ -214,11 +224,20 @@ public Optional<Query<?>> getBaseQuery()
return Optional.ofNullable(baseQuery);
}

/**
* If the original data source is a join data source and there is a DimFilter on the base table data source,
* that DimFilter is returned here
*/
public Optional<DimFilter> getJoinBaseTableFilter()
{
return Optional.ofNullable(joinBaseTableFilter);
}

/**
* Returns the {@link QuerySegmentSpec} that is associated with the base datasource, if any. This only happens
* when there is an outer query datasource. In this case, the base querySegmentSpec is the one associated with the
* innermost subquery.
*
* <p>
* This {@link QuerySegmentSpec} is taken from the query returned by {@link #getBaseQuery()}.
*
* @return the query segment spec associated with the base datasource if {@link #isQuery()} is true, else empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
Expand All @@ -43,6 +44,7 @@
public class HashJoinSegment implements SegmentReference
{
private final SegmentReference baseSegment;
private final Filter baseFilter;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;

Expand All @@ -54,11 +56,13 @@ public class HashJoinSegment implements SegmentReference
*/
public HashJoinSegment(
SegmentReference baseSegment,
@Nullable Filter baseFilter,
List<JoinableClause> clauses,
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
this.baseSegment = baseSegment;
this.baseFilter = baseFilter;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;

Expand Down Expand Up @@ -93,7 +97,12 @@ public QueryableIndex asQueryableIndex()
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysis);
return new HashJoinSegmentStorageAdapter(
baseSegment.asStorageAdapter(),
baseFilter,
clauses,
joinFilterPreAnalysis
);
}

@Override
Expand Down
Loading