From 845092b0b10d5050ed24195ef03716601fb3fc7b Mon Sep 17 00:00:00 2001 From: shee <13843187+qzsee@users.noreply.github.com> Date: Fri, 3 Jan 2025 19:48:52 +0800 Subject: [PATCH] [Improve](mtmv) skip the generation of invalid task for refresh mtmv (#46280) ### What problem does this PR solve? We specified the `excluded_trigger_tables = 'a'` attribute when creating the materialized view. If table `a` is updated frequently, many invalid tasks will be generated, and these tasks do not really refresh the mv, which is unreasonable, too many invalid tasks will wash away useful task information Co-authored-by: garenshi --- .../org/apache/doris/mtmv/MTMVService.java | 11 ++- .../data/mtmv_p0/test_commit_mtmv.out | 20 ++++++ .../suites/mtmv_p0/test_commit_mtmv.groovy | 72 +++++++++++++++++++ 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 278811d3a991f7..26c6bfb10e9876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -198,7 +198,7 @@ public void processEvent(Event event) throws EventException { try { // check if mtmv should trigger by event MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo); - if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { + if (canRefresh(mtmv, table)) { jobManager.onCommit(mtmv); } } catch (Exception e) { @@ -206,4 +206,13 @@ public void processEvent(Event event) throws EventException { } } } + + private boolean canRefresh(MTMV mtmv, TableIf table) { + if (mtmv.getExcludedTriggerTables().contains(table.getName())) { + LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}", + mtmv.getName(), table.getName()); + return false; + } + return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT); + } } diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out b/regression-test/data/mtmv_p0/test_commit_mtmv.out index fafb8f883a4c25..8c96733d2141d4 100644 --- a/regression-test/data/mtmv_p0/test_commit_mtmv.out +++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out @@ -38,3 +38,23 @@ -- !mv1_replace -- 3 2017-03-15 3 +-- !mv_sag -- +1 1 60 + +-- !task_sag -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv_sag1 -- +1 1 60 + +-- !task_sag1 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv_sag2 -- +1 1 60 +1 2 70 +2 1 70 + +-- !task_sag2 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy index cd02dcd57d7fa5..dac1d5c22cda13 100644 --- a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy @@ -127,4 +127,76 @@ suite("test_commit_mtmv") { sql """drop materialized view if exists ${mvName2};""" sql """drop table if exists `${tableName}`""" + //===========test excluded_trigger_tables=========== + def tblStu = "test_commit_mtmv_tbl_stu" + def tblGrade = "test_commit_mtmv_tbl_grade" + def mvSag = "test_commit_mv_sag" + sql """drop materialized view if exists ${mvSag};""" + sql """drop table if exists `${tblStu}`""" + sql """drop table if exists `${tblGrade}`""" + sql """ + CREATE TABLE `${tblStu}` ( + `sid` int(32) NULL, + `sname` varchar(32) NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`sid`) + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE TABLE `${tblGrade}` ( + `sid` int(32) NULL, + `cid` int(32) NULL, + `score` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`sid`) + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvSag} + BUILD DEFERRED + REFRESH COMPLETE ON commit + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "excluded_trigger_tables" = "${tblGrade}" + ) + AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on a.sid = b.sid; + """ + + sql """ + insert into ${tblGrade} values(1, 1, 60); + insert into ${tblStu} values(1, 'sam'); + """ + def sagJobName = getJobName(dbName, mvSag); + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """ + insert into ${tblGrade} values(1, 2, 70); + """ + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """ + insert into ${tblGrade} values(2, 1, 70); + insert into ${tblStu} values(2, 'jack'); + """ + + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """drop materialized view if exists ${mvSag};""" + sql """drop table if exists `${tblStu}`""" + sql """drop table if exists `${tblGrade}`""" }