From 512ede1bc574d134b29a9ece581a277d841d9d2f Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 25 Jul 2024 21:25:50 +0800 Subject: [PATCH 1/5] fix --- .../source/PaimonPredicateConverter.java | 39 ++++++ .../paimon/source/PaimonScanNode.java | 23 +++- .../paimon/test_paimon_predict.groovy | 126 ++++++++++++++++++ 3 files changed, 182 insertions(+), 6 deletions(-) create mode 100644 regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java index 605bc1b321a428..9e46474898db4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java @@ -21,9 +21,11 @@ import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IsNullPredicate; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.Subquery; import org.apache.doris.thrift.TExprOpcode; import org.apache.paimon.data.BinaryString; @@ -85,11 +87,48 @@ private Predicate convertToPaimonExpr(Expr dorisExpr) { default: return null; } + } else if (dorisExpr instanceof InPredicate) { + return doInPredicate((InPredicate) dorisExpr); } else { return binaryExprDesc(dorisExpr); } } + private Predicate doInPredicate(InPredicate predicate) { + // InPredicate, only support a in (1,2,3) + if (predicate.contains(Subquery.class)) { + return null; + } + + SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0)); + if (slotRef == null) { + return null; + } + String colName = slotRef.getColumnName(); + int idx = fieldNames.indexOf(colName); + DataType dataType = paimonFieldTypes.get(idx); + List valueList = new ArrayList<>(); + for (int i = 1; i < predicate.getChildren().size(); i++) { + if (!(predicate.getChild(i) instanceof LiteralExpr)) { + return null; + } + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(predicate.getChild(i)); + Object value = dataType.accept(new PaimonValueConverter(literalExpr)); + if (value == null) { + return null; + } + valueList.add(value); + } + + if (predicate.isNotIn()) { + // not in + return builder.notIn(idx, valueList); + } else { + // in + return builder.in(idx, valueList); + } + } + private Predicate binaryExprDesc(Expr dorisExpr) { TExprOpcode opcode = dorisExpr.getOpcode(); // Make sure the col slot is always first diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index aeecbd7eba2ede..aaddaac6d9d209 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -317,15 +317,26 @@ public Map getLocationProperties() throws MetaNotFoundException, @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - String result = super.getNodeExplainString(prefix, detailLevel) - + String.format("%spaimonNativeReadSplits=%d/%d\n", - prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)); + StringBuilder sb = new StringBuilder(super.getNodeExplainString(prefix, detailLevel)); + sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n", + prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum))); + + sb.append(prefix).append("spredicateFromPaimon:"); + if (predicates.isEmpty()) { + sb.append(" NONE\n"); + } else { + sb.append("\n"); + for (Predicate predicate : predicates) { + sb.append(prefix).append(prefix).append(prefix).append(predicate).append("\n"); + } + } + if (detailLevel == TExplainLevel.VERBOSE) { - result += prefix + "PaimonSplitStats: \n"; + sb.append(prefix).append("PaimonSplitStats: \n"); for (SplitStat splitStat : splitStats) { - result += String.format("%s %s\n", prefix, splitStat); + sb.append(String.format("%s %s\n", prefix, splitStat)); } } - return result; + return sb.toString(); } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy new file mode 100644 index 00000000000000..d004124f4c8c91 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_predict", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable paimon test") + } + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String catalog_name = "test_paimon_predict" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + sql """use `${catalog_name}`.`spark_paimon`""" + + explain { + sql("select * from predict_for_in") + contains("inputSplitNum=9") + } + + def explain_one_column = { col_name -> + + explain { + sql("select * from predict_for_in where ${col_name} in ('a')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('b')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','b')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','x')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('x','y')") + contains("inputSplitNum=0") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','b','c')") + contains("inputSplitNum=9") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('y','x','a','c')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('y','x','a','c')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('a')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('x')") + contains("inputSplitNum=9") + } + } + + explain_one_column('dt') + explain_one_column('hh') + + + sql """ drop catalog if exists ${catalog_name} """ +} + + +/* + +for spark: + +create table predict_for_in(id int, dt string, hh string) partitioned by(dt,hh); + +insert into predict_for_in values (1, 'a', 'a'); +insert into predict_for_in values (2, 'a', 'b'); +insert into predict_for_in values (3, 'a', 'c'); + +insert into predict_for_in values (4, 'b', 'a'); +insert into predict_for_in values (5, 'b', 'b'); +insert into predict_for_in values (6, 'b', 'c'); + +insert into predict_for_in values (7, 'c', 'a'); +insert into predict_for_in values (8, 'c', 'b'); +insert into predict_for_in values (9, 'c', 'c'); + +*/ + From ce9891fe0da5fe018d2bbf9a0b53cb3133ce88eb Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 26 Jul 2024 13:44:55 +0800 Subject: [PATCH 2/5] Update PaimonScanNode.java --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index aaddaac6d9d209..528ebe9c713b05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -327,7 +327,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { } else { sb.append("\n"); for (Predicate predicate : predicates) { - sb.append(prefix).append(prefix).append(prefix).append(predicate).append("\n"); + sb.append(prefix)).append(prefix).append(predicate).append("\n"); } } From 6b0f3f5092c5c8ea76111dcbe610e4b721de7a99 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 26 Jul 2024 14:45:06 +0800 Subject: [PATCH 3/5] fix --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 528ebe9c713b05..4c0af43fa7bc9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -327,7 +327,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { } else { sb.append("\n"); for (Predicate predicate : predicates) { - sb.append(prefix)).append(prefix).append(predicate).append("\n"); + sb.append(prefix).append(prefix).append(predicate).append("\n"); } } From 57d9351017e17282b0f836a681c3ba2faa4d149a Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 26 Jul 2024 18:05:00 +0800 Subject: [PATCH 4/5] fix --- .../suites/external_table_p0/paimon/test_paimon_predict.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy index d004124f4c8c91..6f07ae1db8e155 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy @@ -19,6 +19,7 @@ suite("test_paimon_predict", "p0,external,doris,external_docker,external_docker_ String enabled = context.config.otherConfigs.get("enablePaimonTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("disable paimon test") + return } String minio_port = context.config.otherConfigs.get("iceberg_minio_port") From 4fbf0977dacdaddd2c635d0aedb5b2d3d0756f5a Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sun, 28 Jul 2024 11:42:58 +0800 Subject: [PATCH 5/5] Update fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java Co-authored-by: Mingyu Chen --- .../apache/doris/datasource/paimon/source/PaimonScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 4c0af43fa7bc9d..45516fd2841a43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -321,7 +321,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n", prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum))); - sb.append(prefix).append("spredicateFromPaimon:"); + sb.append(prefix).append("predicatesFromPaimon:"); if (predicates.isEmpty()) { sb.append(" NONE\n"); } else {