From 948cec41f188a5f9b5e134486ebd7fc8f4669366 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 15 Nov 2024 15:37:17 +0800 Subject: [PATCH 1/3] fix --- .../iceberg/source/IcebergScanNode.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 56dda7b4fe28b2..8c38f5fce6511c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -52,6 +52,7 @@ import com.google.common.collect.Lists; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; @@ -385,11 +386,21 @@ private long getCountFromSnapshot() { } Map summary = snapshot.summary(); - if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + return -1; + } + + if (snapshot.operation().equals(DataOperations.REPLACE)) { + // prevent 'dangling delete' problem after `rewrite_data_files` + // ref: https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files + if (summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) { + return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)); + } else { + return -1; + } + } else { return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) - Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); - } else { - return -1; } } From ac7b34762cac74c7010cd93ec2a6873ae8a2af1d Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 15 Nov 2024 17:26:00 +0800 Subject: [PATCH 2/3] fix --- .../iceberg/source/IcebergScanNode.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 8c38f5fce6511c..310bae01510278 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -52,7 +52,6 @@ import com.google.common.collect.Lists; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; @@ -386,22 +385,11 @@ private long getCountFromSnapshot() { } Map summary = snapshot.summary(); - if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0") + || !summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) { return -1; } - - if (snapshot.operation().equals(DataOperations.REPLACE)) { - // prevent 'dangling delete' problem after `rewrite_data_files` - // ref: https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files - if (summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) { - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)); - } else { - return -1; - } - } else { - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) - - Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); - } + return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)); } @Override From 63400e8895d3acf3521d1384e1e9b0468310cddc Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 15 Nov 2024 17:49:40 +0800 Subject: [PATCH 3/3] add test --- .../docker-compose/iceberg/entrypoint.sh.tpl | 12 +++++++++-- .../{ => iceberg}/run01.sql | 0 .../{ => iceberg}/run02.sql | 0 .../{ => iceberg}/run03.sql | 0 .../{ => iceberg}/run04.sql | 0 .../{ => iceberg}/run05.sql | 0 .../iceberg/run06.sql | 21 +++++++++++++++++++ .../{run06.sql => paimon/run01.sql} | 0 .../iceberg/source/IcebergScanNode.java | 3 +++ .../iceberg/test_iceberg_optimize_count.out | 3 +++ .../test_iceberg_optimize_count.groovy | 13 +++++++++++- 11 files changed, 49 insertions(+), 3 deletions(-) rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{ => iceberg}/run01.sql (100%) rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{ => iceberg}/run02.sql (100%) rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{ => iceberg}/run03.sql (100%) rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{ => iceberg}/run04.sql (100%) rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{ => iceberg}/run05.sql (100%) create mode 100644 docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run06.sql rename docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/{run06.sql => paimon/run01.sql} (100%) diff --git a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl index 1af170ff91ba7d..a4b27bdd6c0eec 100644 --- a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl +++ b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl @@ -25,9 +25,17 @@ start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby" -ls /mnt/scripts/create_preinstalled_scripts/*.sql | xargs -n 1 -I {} bash -c ' +ls /mnt/scripts/create_preinstalled_scripts/iceberg/*.sql | xargs -n 1 -I {} bash -c ' START_TIME=$(date +%s) - spark-sql --master spark://doris--spark-iceberg:7077 -f {} + spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -f {} + END_TIME=$(date +%s) + EXECUTION_TIME=$((END_TIME - START_TIME)) + echo "Script: {} executed in $EXECUTION_TIME seconds" +' + +ls /mnt/scripts/create_preinstalled_scripts/paimon/*.sql | xargs -n 1 -I {} bash -c ' + START_TIME=$(date +%s) + spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -f {} END_TIME=$(date +%s) EXECUTION_TIME=$((END_TIME - START_TIME)) echo "Script: {} executed in $EXECUTION_TIME seconds" diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run01.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run01.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run01.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run01.sql diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run02.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run02.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run02.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run02.sql diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run03.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run03.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run03.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run03.sql diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run04.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run04.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run04.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run04.sql diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run05.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run05.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run05.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run05.sql diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run06.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run06.sql new file mode 100644 index 00000000000000..3ac97c50099e10 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run06.sql @@ -0,0 +1,21 @@ +use demo.test_db; + +drop table if exists dangling_delete_after_write; +create table dangling_delete_after_write ( + id BIGINT NOT NULL, + val STRING) +USING iceberg +TBLPROPERTIES ( + 'format' = 'iceberg/parquet', + 'format-version' = '2', + 'identifier-fields' = '[id]', + 'upsert-enabled' = 'true', + 'write.delete.mode' = 'merge-on-read', + 'write.parquet.compression-codec' = 'zstd', + 'write.update.mode' = 'merge-on-read', + 'write.upsert.enabled' = 'true'); + +insert into dangling_delete_after_write values(1, 'abd'); +update dangling_delete_after_write set val = 'def' where id = 1; +call demo.system.rewrite_data_files(table => 'demo.test_db.dangling_delete_after_write', options => map('min-input-files', '1')); +insert into dangling_delete_after_write values(2, 'xyz'); \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run06.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql similarity index 100% rename from docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run06.sql rename to docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run01.sql diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 310bae01510278..f7b58158d1a72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -384,6 +384,9 @@ private long getCountFromSnapshot() { return 0; } + // `TOTAL_POSITION_DELETES` is need to 0, + // because prevent 'dangling delete' problem after `rewrite_data_files` + // ref: https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files Map summary = snapshot.summary(); if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0") || !summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) { diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out index f2e945f9cec5d7..ec9129a00d2708 100644 --- a/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out @@ -23,3 +23,6 @@ -- !q08 -- 1000 +-- !q09 -- +2 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy index 6c7789a6ca180c..306af3b2cb2852 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy @@ -69,7 +69,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external } explain { sql("""${sqlstr4}""") - contains """pushdown agg=COUNT (1000)""" + contains """pushdown agg=COUNT (-1)""" } // don't use push down count @@ -97,6 +97,17 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external contains """pushdown agg=NONE""" } + // There has `dangling delete` after rewrite + sql """ set enable_count_push_down_for_external_table=true; """ + sqlstr5 = """ select count(*) from ${catalog_name}.test_db.dangling_delete_after_write; """ + + qt_q09 """${sqlstr5}""" + + explain { + sql("""${sqlstr5}""") + contains """pushdown agg=COUNT (-1)""" + } + } finally { sql """ set enable_count_push_down_for_external_table=true; """ sql """drop catalog if exists ${catalog_name}"""