Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.types.Type.TypeID;
Expand Down Expand Up @@ -254,6 +257,10 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
if (left != null && right != null) {
expression = Expressions.and(left, right);
} else if (left != null) {
return left;
} else if (right != null) {
return right;
}
break;
}
Expand Down Expand Up @@ -356,6 +363,9 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
}
LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
Object value = extractDorisLiteral(nestedField.type(), literalExpr);
if (value == null) {
return null;
}
valueList.add(value);
}
if (inExpr.isNotIn()) {
Expand All @@ -366,16 +376,63 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
expression = Expressions.in(colName, valueList);
}
}
if (expression != null && expression instanceof Unbound) {
try {
((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
return expression;
} catch (Exception e) {
LOG.warn("Failed to check expression: " + e.getMessage());
return null;

return checkConversion(expression, schema);
}

private static Expression checkConversion(Expression expression, Schema schema) {
if (expression == null) {
return null;
}
switch (expression.op()) {
case AND: {
And andExpr = (And) expression;
Expression left = checkConversion(andExpr.left(), schema);
Expression right = checkConversion(andExpr.right(), schema);
if (left != null && right != null) {
return andExpr;
} else if (left != null) {
return left;
} else if (right != null) {
return right;
} else {
return null;
}
}
case OR: {
Or orExpr = (Or) expression;
Expression left = checkConversion(orExpr.left(), schema);
Expression right = checkConversion(orExpr.right(), schema);
if (left == null || right == null) {
return null;
} else {
return orExpr;
}
}
case NOT: {
Not notExpr = (Not) expression;
Expression child = checkConversion(notExpr.child(), schema);
if (child == null) {
return null;
} else {
return notExpr;
}
}
case TRUE:
case FALSE:
return expression;
default:
if (!(expression instanceof Unbound)) {
return null;
}
try {
((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
return expression;
} catch (Exception e) {
LOG.debug("Failed to check expression: {}", e.getMessage());
return null;
}
}
return null;
}

private static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergType, Expr expr) {
Expand All @@ -394,6 +451,7 @@ private static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergT
DateLiteral dateLiteral = (DateLiteral) expr;
switch (icebergTypeID) {
case STRING:
case DATE:
return dateLiteral.getStringValue();
case TIMESTAMP:
return dateLiteral.unixTimestamp(TimeUtils.getTimeZone()) * MILLIS_TO_NANO_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
Expand All @@ -51,7 +52,8 @@
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;

import avro.shaded.com.google.common.base.Preconditions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CombinedScanTask;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class IcebergScanNode extends FileQueryScanNode {

private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();

/**
* External file scan node for Query iceberg table
Expand Down Expand Up @@ -201,6 +204,7 @@ private List<Split> doGetSplits() throws UserException {
}
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
this.pushdownIcebergPredicates.add(predicate.toString());
}

// get splits
Expand Down Expand Up @@ -446,4 +450,17 @@ protected void toThrift(TPlanNode planNode) {
}
}
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
return super.getNodeExplainString(prefix, detailLevel);
}
StringBuilder sb = new StringBuilder();
for (String predicate : pushdownIcebergPredicates) {
sb.append(prefix).append(prefix).append(predicate).append("\n");
}
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}
}
Loading