-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Support filter pushdown in IcebergTableSource #1893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@openinx could you help me review the pr when you have time ,thanks |
|
Thanks @zhangjun0x01 for contributing, I will review this patch today or tomorrow. |
2fe26ee to
e05b6e9
Compare
|
|
||
| public static Expression convert(org.apache.flink.table.expressions.Expression flinkExpression) { | ||
| if (!(flinkExpression instanceof CallExpression)) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with flink, I wonder if it should be a valid use case here and in other places that we return null; should we throw instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg support the following Expressions:
https://iceberg.apache.org/api/#expressions
For some expressions supported by flink but not supported by iceberg, I did not convert them, because they cannot be used for iceberg table scan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is an unsupported expression, there is no need to do filter push down, I think we should not throw a exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Do we want to return Optional<Expression> here then? In this case it signals that the returned value could be null, so when we add the converted expression to the list we can decide to not add nulls, so that we don't have to do null check when calling toString?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your suggestion,I update it to Optional.
and I add a not push down test case which return a Optional.empty()
| FieldReferenceExpression field = (FieldReferenceExpression) args.get(0); | ||
| List<ResolvedExpression> values = args.subList(1, args.size()); | ||
|
|
||
| List<Object> expressions = values.stream().filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think these are input values, not expressions
| } | ||
|
|
||
| private static boolean literalOnRight(List<ResolvedExpression> args) { | ||
| return args.get(0) instanceof FieldReferenceExpression && args.get(1) instanceof ValueLiteralExpression ? true : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need to have ? true : false
| private FlinkFilters() { | ||
| } | ||
|
|
||
| private static final Map<BuiltInFunctionDefinition, Operation> FILTERS = ImmutableMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input flinkExpression. getFunctionDefinition()?
Also, there's a recent change that requires rewriting NaN in equals/notEquals to isNaN/notNaN as Iceberg's equals no longer accepts NaN as literal, so we will have to rewrite here as well. Here is a similar change done in spark filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input
flinkExpression. getFunctionDefinition()?
flinkExpression.getFunctionDefinition() return a implement class of FunctionDefinition,which cannot be used directly in switch,so we add a mapping,similar to SparkFilters
|
|
||
| if (filters != null) { | ||
| explain += String.format(", FilterPushDown,the filters :%s", | ||
| filters.stream().map(filter -> filter.toString()).collect(Collectors.joining(","))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning null in the filters class will cause NPE here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's more simple to rewrite this as:
explain += String.format(", FilterPushDown,the filters :%s", Joiner.on(",").join(filters));e05b6e9 to
7f7cbf5
Compare
|
@yyanyy thanks for your review,I update all |
| Object value = valueLiteralExpression.getValueAs(clazz).get(); | ||
|
|
||
| BuiltInFunctionDefinition functionDefinition = (BuiltInFunctionDefinition) call.getFunctionDefinition(); | ||
| if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to rewrite NOT_EQUALS to notNaN as well as notEquals in Iceberg also doesn't accept NaN as literal; I think SparkFilters doesn't do that because there's no NotEqualTo filter in Spark.
658b61d to
629c5af
Compare
| switch (op) { | ||
| case IS_NULL: | ||
| FieldReferenceExpression isNullFilter = (FieldReferenceExpression) call.getResolvedChildren().get(0); | ||
| return Optional.of(Expressions.isNull(isNullFilter.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does FieldReferenceExpression.getName() reference nested fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it, for example, we have a table,the flink ddl is like this:
CREATE TABLE iceberg_nested_test (
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector'='iceberg'
);
if the query sql is select * from iceberg_nested_test where properties is null ,it supports filter push down in flink, and the name is properties,if the sql is select * from iceberg_nested_test where properties.foo is null,it do not supports filter push down in flink,the code will do not enter the IcebergTableSource#applyPredicate method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds fine that Flink doesn't currently support predicate pushdown on nested fields. @openinx, any plans to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , flink does not support nested field push down now. Will need to file issue to address it in apache flink repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create a flink issue to track this https://issues.apache.org/jira/browse/FLINK-20767
| } | ||
|
|
||
| String name = fieldReferenceExpression.getName(); | ||
| Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class is parameterized, so this should be Class<?>
|
|
||
| String name = fieldReferenceExpression.getName(); | ||
| Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass(); | ||
| Object value = valueLiteralExpression.getValueAs(clazz).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ValueLiteralExpression allows the value to be null, in which case get here will throw an exception. How is this avoided? Does the parser reject col = null expressions?
@openinx may be able to help here, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, a few lines down there is an assertion that the value isn't null. This looks like a bug to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it ,if the sql is select * from mytable where data = null ,it do not supports filter push down in flink,and we do not can get any data.
if the sql is select * from mytable where data is null, it is normal ,and It will enter the IS_NULL branch of switch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to fix this case rather than making the assumption that Flink won't push the = null filter. Handling null will be good for maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreed with @rdblue that handling null in this function rather than assuming flink won't push down the null.
| expression -> { | ||
| if (expression instanceof ValueLiteralExpression) { | ||
| return !((ValueLiteralExpression) flinkExpression).isNull(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not discard anything that is not a ValueLiteralExpression. Instead, if there is a non-literal this should either throw IllegalArgumentException or return Optional.empty to signal that the expression cannot be converted.
| case IN: | ||
| List<ResolvedExpression> args = call.getResolvedChildren(); | ||
| FieldReferenceExpression field = (FieldReferenceExpression) args.get(0); | ||
| List<ResolvedExpression> values = args.subList(1, args.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be converted to List<ValueLiteralExpression> to simplify value conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the IN,BETWEEN,NOT_BETWEEN will be auto convert in flink ,so we will not enter the IN block,
should we delete IN branch ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're sure that flink won't enter the IN block, then I think we should remove this block. Pls add a comment saying IN will convert to multiple OR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remove IN block and add some comments
| return !((ValueLiteralExpression) flinkExpression).isNull(); | ||
| } | ||
|
|
||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null values can't be ignored. This should either return Optional.empty or throw IllegalArgumentException if there is a null value.
| return Optional.of(Expressions.in(field.getName(), inputValues)); | ||
|
|
||
| case NOT: | ||
| Optional<Expression> child = convert(call.getResolvedChildren().get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several calls to getResolvedChildren().get(0). I think that should be converted to a method that validates there is only one child and also validates the type:
private <T extends ResolvedExpression> Optional<T> getOnlyChild(CallExpression call, Class<T> expectedChildClass) {
List<ResolvedExpression> children = call.getResolvedChildren();
if (children.size() != 1) {
return Optional.empty();
}
ResolvedExpression child = children.get(0);
if (!expectedChildClass.isInstance(child)) {
return Optional.empty();
}
return Optional.of(expectedChildClass.cast(child));
}| case NOT: | ||
| Optional<Expression> child = convert(call.getResolvedChildren().get(0)); | ||
| if (child.isPresent()) { | ||
| return Optional.of(Expressions.not(child.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be child.map(Expressions::not).
| } | ||
| } | ||
|
|
||
| return Optional.of(function.apply(name, value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The literal value needs to be converted to Iceberg's internal representation before being passed to create an expression. Flink will return LocalDate, LocalTime, LocalDateTime, etc. just in the getValueAs method. And it isn't clear whether the value stored in the literal is the correct representation for other types as well.
@openinx, could you help recommend how to do the conversion here?
| } | ||
|
|
||
| @Test | ||
| public void testFilterPushDown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should be broken into individual methods that are each a test case. To share code, use @Before and @After and different test suites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test case that listens for a ScanEvent and validates that the expression was correctly passed to Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a listener to validate the pushdown for filter in TestFlinkTableSource
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.util.NaNUtil; | ||
|
|
||
| public class FlinkFilters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs an extensive test suite that checks the conversion from expected Flink expressions, not just a test for the source.
The conversion needs to cover at least these cases:
- Equals with null
- Not equals with null
- In with null
- Not in with null
- Equals with NaN
- Not equals with NaN
- In with NaN
- Not in with NaN
- All inequalities with null
- All inequalities with NaN
- All expressions with a non-null and non-Nan value (preferably one string and one numeric)
- Each data type that is supposed by Iceberg/Flink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I look up the flink doc and the source code, and tested it ,it seems that NaN and Infinity are not supported by flink now .
the data type is supported by flink : here
|
Thanks for working on this @zhangjun0x01! It looks like a great start to me, and I'd really like to get this working in Flink. |
@rdblue , thank you very much for your review,I am very sorry than some situations are not well considered,I would be careful next time, and I will update the PR later |
No problem, this is why we review! |
|
I'm sorry that I did not review this PR in time before (was focusing on flink cdc DataStream/SQL test cases and more optimizations things after the next release 0.11.0), will take a look tomorrow. |
|
I'd like to get this into the 0.11.0 release, if possible. Thanks for working on this, @zhangjun0x01! It will be great to have this feature done. |
edad5ee to
7daffa6
Compare
@rdblue thanks very much for your review,I updated it. |
d33aa49 to
217d059
Compare
| org.apache.iceberg.expressions.Expressions.equal("field1", 1)); | ||
|
|
||
| Assert.assertEquals("Predicate operation should match", expected.op(), not.op()); | ||
| assertPredicatesMatch((UnboundPredicate<?>) expected.child(), (UnboundPredicate<?>) not.child()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These casts are unnecessary.
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
Outdated
Show resolved
Hide resolved
|
@zhangjun0x01, there are still a few things to fix in the tests, mostly minor. But I also found a major problem, which is that The filter pushdown tests only need to run for one catalog and one file format because the purpose of those tests is to validate the assumptions of the |
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
Outdated
Show resolved
Hide resolved
217d059 to
c93eaa7
Compare
c93eaa7 to
138fa46
Compare
I refactor it with |
|
Thanks, @zhangjun0x01! It is great that this will be in the 0.11.0 release. Thanks for getting it done! |
add filter push down for IcebergTableSource