From 4c48c70d6294daddde38d47d7343d546599cedde Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 15:31:35 +0800 Subject: [PATCH 01/21] 1 --- .../InsertOverwriteManager.java | 53 +++++++++++++++++++ .../doris/job/extensions/mtmv/MTMVTask.java | 9 +++- .../insert/InsertOverwriteTableCommand.java | 48 ++++++++++++++--- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 81524ae020810e..325344d5dbff9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -24,10 +24,12 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -40,7 +42,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class InsertOverwriteManager extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class); @@ -62,6 +66,11 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { @SerializedName(value = "partitionPairs") private Map> partitionPairs = Maps.newConcurrentMap(); + // TableId running insert overwrite + // dbId ==> Set + private Map> runningTables = Maps.newHashMap(); + private ReentrantReadWriteLock runningLock = new ReentrantReadWriteLock(true); + public InsertOverwriteManager() { super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000); } @@ -270,6 +279,50 @@ private boolean rollback(long taskId) { return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); } + /** + * If the current table id has a running insert overwrite, throw an exception. + * If not, record it in runningTables + * + * @param dbId Run the dbId for insert overwrite + * @param tableId Run the tableId for insert overwrite + */ + public void recordRunningTableOrException(long dbId, long tableId) { + runningLock.writeLock().lock(); + try { + if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { + throw new AnalysisException( + String.format("insert overwrite is running on db: {}, table: {}", dbId, tableId)); + } + if (runningTables.containsKey(dbId)) { + runningTables.get(dbId).add(tableId); + } else { + runningTables.put(dbId, Sets.newHashSet(tableId)); + } + } finally { + runningLock.writeLock().unlock(); + } + } + + /** + * Remove from running records + * + * @param dbId Run the dbId for insert overwrite + * @param tableId Run the tableId for insert overwrite + */ + public void dropRunningRecord(long dbId, long tableId) { + runningLock.writeLock().lock(); + try { + if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { + runningTables.get(dbId).remove(tableId); + if (runningTables.get(dbId).size() == 0) { + runningTables.remove(dbId); + } + } + } finally { + runningLock.writeLock().unlock(); + } + } + /** * replay logs * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 0fe60a94e56b57..b9b626061218b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -136,6 +136,7 @@ public enum MTMVTaskRefreshMode { private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; + private UpdateMvByPartitionCommand command; private Map partitionSnapshots; public MTMVTask() { @@ -221,7 +222,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set - UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand + command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); @@ -253,6 +254,9 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); + if (command != null) { + command.cancel(); + } if (executor != null) { executor.cancel(); } @@ -403,6 +407,9 @@ protected void closeOrReleaseResources() { if (null != partitionSnapshots) { partitionSnapshots = null; } + if (null != command) { + command = null; + } } private Map getIncrementalTableMap() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index afcb5ee81d2958..44516b944a1b4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -28,6 +28,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -81,6 +82,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private LogicalPlan logicalQuery; private Optional labelName; private final Optional cte; + private volatile boolean isCancelled = false; /** * constructor @@ -157,38 +159,72 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled) { + LOG.info("insert overwrite isCancelled before registerTask"); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before addTempPartitions"); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before insertInto"); + insertOverwriteManager.taskFail(taskId); + return; + } insertInto(ctx, executor, tempPartitionNames); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before replacePartition"); + insertOverwriteManager.taskFail(taskId); + return; + } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + if (isCancelled) { + LOG.info("insert overwrite isCancelled before taskSuccess, do nothing"); + } + insertOverwriteManager.taskSuccess(taskId); } } catch (Exception e) { LOG.warn("insert into overwrite failed with task(or group) id " + taskId); if (isAutoDetectOverwrite()) { - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); + insertOverwriteManager.taskGroupFail(taskId); } else { - Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + insertOverwriteManager.taskFail(taskId); } throw e; } finally { ConnectContext.get().setSkipAuth(false); + insertOverwriteManager + .dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId()); } } + /** + * cancel insert overwrite + */ + public void cancel() { + this.isCancelled = true; + } + private boolean allowInsertOverwrite(TableIf targetTable) { if (targetTable instanceof OlapTable) { return true; From 0b64785dd2f6f4ec00d425729397d364f25274c8 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 15:34:59 +0800 Subject: [PATCH 02/21] 1 --- .../trees/plans/commands/insert/InsertOverwriteTableCommand.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 44516b944a1b4f..42c42d1d4472ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -179,6 +179,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + Thread.sleep(10000L); if (isCancelled) { LOG.info("insert overwrite isCancelled before addTempPartitions"); // not need deal temp partition From 9c38f5771dacac3b97fefd15287def6899af0453 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 16:00:29 +0800 Subject: [PATCH 03/21] 1 --- .../doris/insertoverwrite/InsertOverwriteManager.java | 2 +- .../commands/insert/InsertOverwriteTableCommand.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 325344d5dbff9a..ff5d8ec59a4568 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -291,7 +291,7 @@ public void recordRunningTableOrException(long dbId, long tableId) { try { if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { throw new AnalysisException( - String.format("insert overwrite is running on db: {}, table: {}", dbId, tableId)); + String.format("insert overwrite is running on db: %s, table: %s", dbId, tableId)); } if (runningTables.containsKey(dbId)) { runningTables.get(dbId).add(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 42c42d1d4472ca..80a5866590c1f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -83,6 +83,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private Optional labelName; private final Optional cte; private volatile boolean isCancelled = false; + private volatile boolean isRunning = false; /** * constructor @@ -161,6 +162,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = true; long taskId = 0; try { if (isAutoDetectOverwrite()) { @@ -216,6 +218,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { ConnectContext.get().setSkipAuth(false); insertOverwriteManager .dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning = false; } } @@ -224,6 +227,11 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ public void cancel() { this.isCancelled = true; + while (true) { + if (!isRunning) { + return; + } + } } private boolean allowInsertOverwrite(TableIf targetTable) { From dcb24c3ff2164dd961d3040642221dcd0d6c7002 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 16:39:48 +0800 Subject: [PATCH 04/21] 1 --- .../plans/commands/insert/InsertOverwriteTableCommand.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 80a5866590c1f2..342b04ed14f159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -230,6 +230,8 @@ public void cancel() { while (true) { if (!isRunning) { return; + } else { + LOG.warn("================wait stop running"); } } } From af8def9066164b6612620c868c9a91474a42a8f8 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:01:14 +0800 Subject: [PATCH 05/21] 1 --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index b9b626061218b7..df7a4eba15891d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -230,6 +230,9 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, ctx.setQueryId(queryId); ctx.getState().setNereids(true); command.run(ctx, executor); + if (getStatus() == TaskStatus.CANCELED) { + throw new JobException("task is CANCELED"); + } if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); } From 2df4044d26a788f547dedc52e8c4c991bd912be2 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:07:44 +0800 Subject: [PATCH 06/21] 1 --- .../insert/InsertOverwriteTableCommand.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 342b04ed14f159..9d9cd358a1a0c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -227,11 +227,21 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ public void cancel() { this.isCancelled = true; + waitNotRunning(); + } + + private void waitNotRunning() { while (true) { if (!isRunning) { - return; - } else { - LOG.warn("================wait stop running"); + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.warn(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("insert overwrite waitNotRunning"); } } } From f993a59f8084d94bf39f4342d0b064cda0068a81 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:23:10 +0800 Subject: [PATCH 07/21] 1 --- .../trees/plans/commands/insert/InsertOverwriteTableCommand.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 9d9cd358a1a0c6..d9f73a027d5be9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -181,7 +181,6 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); - Thread.sleep(10000L); if (isCancelled) { LOG.info("insert overwrite isCancelled before addTempPartitions"); // not need deal temp partition From cc2f221f5df2e3f39d0ed584d961caad2199bc12 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:48:27 +0800 Subject: [PATCH 08/21] 1 --- .../suites/mtmv_p0/test_alter_job_mtmv.groovy | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy new file mode 100644 index 00000000000000..808fde7336bf6e --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_alter_job_mtmv") { + String suiteName = "test_truncate_table_mtmv" + String tableName = "${suiteName}_table" + String mvName = "${suiteName}_mv" + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE TABLE `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(1,"2017-02-15",2),(1,"2017-03-15",3); + """ + + //This is an immediately built materialized view that cancels running tasks and creates new ones after updating job information. + // Due to the uncertainty of the case, there may be several situations here: + // 1. The task has not been created yet, so it has not been cancelled + // 2. The task has been completed, so there was no cancellation + // 3. The task has been created but not yet completed + // But regardless of the status of the previous case, + // this case is used to ensure that the newly launched task can run successfully after modifying the materialized view + sql """ + CREATE MATERIALIZED VIEW ${mvName} + REFRESH COMPLETE ON MANUAL + partition by(`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableNameNum}; + """ + sql """alter MATERIALIZED VIEW mv1 refresh COMPLETE on commit; """ + waitingMTMVTaskFinishedByMvName(mvName) + + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" +} From ffb4c59deae5c856e29831d9145a8a1f400823ad Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:50:47 +0800 Subject: [PATCH 09/21] 1 --- regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy index 808fde7336bf6e..6b50e396bed13a 100644 --- a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -57,7 +57,7 @@ suite("test_alter_job_mtmv") { DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') AS - SELECT * FROM ${tableNameNum}; + SELECT * FROM ${tableName}; """ sql """alter MATERIALIZED VIEW mv1 refresh COMPLETE on commit; """ waitingMTMVTaskFinishedByMvName(mvName) From 8d35ec48eb12e6278fd6ecad1c0acc9cafc278f2 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:52:26 +0800 Subject: [PATCH 10/21] 1 --- regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy index 6b50e396bed13a..51249d38bfc930 100644 --- a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -59,7 +59,7 @@ suite("test_alter_job_mtmv") { AS SELECT * FROM ${tableName}; """ - sql """alter MATERIALIZED VIEW mv1 refresh COMPLETE on commit; """ + sql """alter MATERIALIZED VIEW ${mvName} refresh COMPLETE on commit; """ waitingMTMVTaskFinishedByMvName(mvName) sql """drop table if exists `${tableName}`""" From 4337cbf24e04a7f07d10f660310e3c0910214039 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 17:59:31 +0800 Subject: [PATCH 11/21] 1 --- .../main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index df7a4eba15891d..0ba4bd14fec24d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -231,6 +231,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, ctx.getState().setNereids(true); command.run(ctx, executor); if (getStatus() == TaskStatus.CANCELED) { + // Throwing an exception to interrupt subsequent partition update tasks throw new JobException("task is CANCELED"); } if (ctx.getState().getStateType() != MysqlStateType.OK) { From 7b04297337f88bd06869152fe2889d204eabbe1b Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 9 Sep 2024 18:21:44 +0800 Subject: [PATCH 12/21] 1 --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 6 ++++++ .../plans/commands/insert/InsertOverwriteTableCommand.java | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 0ba4bd14fec24d..07878cbf72f2ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -258,12 +258,18 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); + // If the be scheduling has not been triggered yet, cancel the scheduling first if (command != null) { command.cancel(); } + // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); } + // Wait for the command to run or cancel completion + if (command != null) { + command.waitNotRunning(); + } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index d9f73a027d5be9..01f78b4a4b9028 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -226,10 +226,12 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ public void cancel() { this.isCancelled = true; - waitNotRunning(); } - private void waitNotRunning() { + /** + * wait insert overwrite not running + */ + public void waitNotRunning() { while (true) { if (!isRunning) { break; From 7fabd24bf5eef2a9ed621a500a45ac88d6c4613a Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Tue, 10 Sep 2024 17:41:56 +0800 Subject: [PATCH 13/21] comment --- .../doris/job/extensions/mtmv/MTMVTask.java | 15 +------------ .../insert/InsertOverwriteTableCommand.java | 13 +++++++----- .../org/apache/doris/qe/StmtExecutor.java | 21 +++++++++++++++++++ 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 07878cbf72f2ed..59a421509d9c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -136,7 +136,6 @@ public enum MTMVTaskRefreshMode { private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; - private UpdateMvByPartitionCommand command; private Map partitionSnapshots; public MTMVTask() { @@ -222,7 +221,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set - command = UpdateMvByPartitionCommand + UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); @@ -258,18 +257,9 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); - // If the be scheduling has not been triggered yet, cancel the scheduling first - if (command != null) { - command.cancel(); - } - // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); } - // Wait for the command to run or cancel completion - if (command != null) { - command.waitNotRunning(); - } after(); } @@ -417,9 +407,6 @@ protected void closeOrReleaseResources() { if (null != partitionSnapshots) { partitionSnapshots = null; } - if (null != command) { - command = null; - } } private Map getIncrementalTableMap() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 01f78b4a4b9028..cf17ee8c1b1100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -176,32 +176,35 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } else { List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); if (isCancelled) { - LOG.info("insert overwrite isCancelled before registerTask"); + LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); return; } taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); if (isCancelled) { - LOG.info("insert overwrite isCancelled before addTempPartitions"); + LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); // not need deal temp partition insertOverwriteManager.taskSuccess(taskId); return; } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); if (isCancelled) { - LOG.info("insert overwrite isCancelled before insertInto"); + LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); insertOverwriteManager.taskFail(taskId); return; } insertInto(ctx, executor, tempPartitionNames); if (isCancelled) { - LOG.info("insert overwrite isCancelled before replacePartition"); + LOG.info("insert overwrite isCancelled before replacePartition, queryId: {}", + ctx.getQueryIdentifier()); insertOverwriteManager.taskFail(taskId); return; } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); if (isCancelled) { - LOG.info("insert overwrite isCancelled before taskSuccess, do nothing"); + LOG.info("insert overwrite isCancelled before taskSuccess, do nothing, queryId: {}", + ctx.getQueryIdentifier()); } insertOverwriteManager.taskSuccess(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6bf09f2229bd8d..3c1cf71bcf026d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1531,6 +1531,11 @@ public void cancel() { } return; } + Optional insertOverwriteTableCommand = getInsertOverwriteTableCommand(); + if (insertOverwriteTableCommand.isPresent()) { + // If the be scheduling has not been triggered yet, cancel the scheduling first + insertOverwriteTableCommand.get().cancel(); + } Coordinator coordRef = coord; if (coordRef != null) { coordRef.cancel(); @@ -1541,6 +1546,22 @@ public void cancel() { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } + if (insertOverwriteTableCommand.isPresent()) { + // Wait for the command to run or cancel completion + insertOverwriteTableCommand.get().waitNotRunning(); + } + } + + private Optional getInsertOverwriteTableCommand() { + if (parsedStmt instanceof LogicalPlanAdapter) { + LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt; + LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan(); + if (logicalPlan instanceof InsertOverwriteTableCommand) { + InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan; + return Optional.of(insertOverwriteTableCommand); + } + } + return Optional.empty(); } // Because this is called by other thread From 01069dcb7af059a265e17edee288753fd22dae69 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 12 Sep 2024 11:04:03 +0800 Subject: [PATCH 14/21] test pipeline fail --- .../doris/job/extensions/mtmv/MTMVTask.java | 15 ++++++++++++- .../org/apache/doris/qe/StmtExecutor.java | 21 ------------------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 59a421509d9c75..07878cbf72f2ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -136,6 +136,7 @@ public enum MTMVTaskRefreshMode { private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; + private UpdateMvByPartitionCommand command; private Map partitionSnapshots; public MTMVTask() { @@ -221,7 +222,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set - UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand + command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); @@ -257,9 +258,18 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); + // If the be scheduling has not been triggered yet, cancel the scheduling first + if (command != null) { + command.cancel(); + } + // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); } + // Wait for the command to run or cancel completion + if (command != null) { + command.waitNotRunning(); + } after(); } @@ -407,6 +417,9 @@ protected void closeOrReleaseResources() { if (null != partitionSnapshots) { partitionSnapshots = null; } + if (null != command) { + command = null; + } } private Map getIncrementalTableMap() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3c1cf71bcf026d..6bf09f2229bd8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1531,11 +1531,6 @@ public void cancel() { } return; } - Optional insertOverwriteTableCommand = getInsertOverwriteTableCommand(); - if (insertOverwriteTableCommand.isPresent()) { - // If the be scheduling has not been triggered yet, cancel the scheduling first - insertOverwriteTableCommand.get().cancel(); - } Coordinator coordRef = coord; if (coordRef != null) { coordRef.cancel(); @@ -1546,22 +1541,6 @@ public void cancel() { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } - if (insertOverwriteTableCommand.isPresent()) { - // Wait for the command to run or cancel completion - insertOverwriteTableCommand.get().waitNotRunning(); - } - } - - private Optional getInsertOverwriteTableCommand() { - if (parsedStmt instanceof LogicalPlanAdapter) { - LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt; - LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan(); - if (logicalPlan instanceof InsertOverwriteTableCommand) { - InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan; - return Optional.of(insertOverwriteTableCommand); - } - } - return Optional.empty(); } // Because this is called by other thread From 61fe2baf2e0b705af104007c251123facae02e47 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 12 Sep 2024 16:02:03 +0800 Subject: [PATCH 15/21] 1 --- .../org/apache/doris/job/extensions/mtmv/MTMVTask.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 07878cbf72f2ed..006ac5cd6dcc07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -258,18 +258,10 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); - // If the be scheduling has not been triggered yet, cancel the scheduling first - if (command != null) { - command.cancel(); - } // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); } - // Wait for the command to run or cancel completion - if (command != null) { - command.waitNotRunning(); - } after(); } From d615f54e476766e14577ea01b567de7bfe3306b3 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 12 Sep 2024 20:17:21 +0800 Subject: [PATCH 16/21] 1 --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 006ac5cd6dcc07..a8028f6ddb187e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -258,6 +258,10 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); + // If the be scheduling has not been triggered yet, cancel the scheduling first + if (command != null) { + command.cancel(); + } // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); From 8bcbe0c48dd50fa1efc8e9e1202d83777c380c55 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 13 Sep 2024 14:49:38 +0800 Subject: [PATCH 17/21] wait 10s --- .../doris/job/extensions/mtmv/MTMVTask.java | 11 +-------- .../insert/InsertOverwriteTableCommand.java | 12 +++++++++- .../org/apache/doris/qe/StmtExecutor.java | 21 ++++++++++++++++ .../doris/regression/suite/Suite.groovy | 24 ++++++++++++++++++- .../suites/mtmv_p0/test_alter_job_mtmv.groovy | 2 +- 5 files changed, 57 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index a8028f6ddb187e..59a421509d9c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -136,7 +136,6 @@ public enum MTMVTaskRefreshMode { private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; - private UpdateMvByPartitionCommand command; private Map partitionSnapshots; public MTMVTask() { @@ -222,7 +221,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set - command = UpdateMvByPartitionCommand + UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); @@ -258,11 +257,6 @@ public synchronized void onSuccess() throws JobException { @Override protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); - // If the be scheduling has not been triggered yet, cancel the scheduling first - if (command != null) { - command.cancel(); - } - // Cancel the ongoing scheduling if (executor != null) { executor.cancel(); } @@ -413,9 +407,6 @@ protected void closeOrReleaseResources() { if (null != partitionSnapshots) { partitionSnapshots = null; } - if (null != command) { - command = null; - } } private Map getIncrementalTableMap() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index cf17ee8c1b1100..262fbe8d909f16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -235,17 +235,27 @@ public void cancel() { * wait insert overwrite not running */ public void waitNotRunning() { + long waitMaxTimeMills = 10 * 1000L; + long waitTime = 0; while (true) { if (!isRunning) { break; } + if (waitTime >= waitMaxTimeMills) { + LOG.warn("waiting time exceeds {} ms, stop wait, labelName: {}", waitMaxTimeMills, + labelName.isPresent() ? labelName.get() : ""); + break; + } try { Thread.sleep(100); + waitTime += 100; } catch (InterruptedException e) { LOG.warn(e); + break; } if (LOG.isDebugEnabled()) { - LOG.debug("insert overwrite waitNotRunning"); + LOG.debug("insert overwrite waitNotRunning, labelName: {}", + labelName.isPresent() ? labelName.get() : ""); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6bf09f2229bd8d..3c1cf71bcf026d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1531,6 +1531,11 @@ public void cancel() { } return; } + Optional insertOverwriteTableCommand = getInsertOverwriteTableCommand(); + if (insertOverwriteTableCommand.isPresent()) { + // If the be scheduling has not been triggered yet, cancel the scheduling first + insertOverwriteTableCommand.get().cancel(); + } Coordinator coordRef = coord; if (coordRef != null) { coordRef.cancel(); @@ -1541,6 +1546,22 @@ public void cancel() { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } + if (insertOverwriteTableCommand.isPresent()) { + // Wait for the command to run or cancel completion + insertOverwriteTableCommand.get().waitNotRunning(); + } + } + + private Optional getInsertOverwriteTableCommand() { + if (parsedStmt instanceof LogicalPlanAdapter) { + LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt; + LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan(); + if (logicalPlan instanceof InsertOverwriteTableCommand) { + InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan; + return Optional.of(insertOverwriteTableCommand); + } + } + return Optional.empty(); } // Because this is called by other thread diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 49e53cd94e3f99..96663905b9ddcf 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1275,7 +1275,29 @@ class Suite implements GroovyInterceptable { } logger.info("The state of ${showTasks} is ${status}") Thread.sleep(1000); - } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + if (status != "SUCCESS") { + logger.info("status is not success") + } + Assert.assertEquals("SUCCESS", status) + } + + void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) { + Thread.sleep(2000); + String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC" + String status = "NULL" + List> result + long startTime = System.currentTimeMillis() + long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min + do { + result = sql(showTasks) + logger.info("result: " + result.toString()) + if (!result.isEmpty()) { + status = result.last().get(4) + } + logger.info("The state of ${showTasks} is ${status}") + Thread.sleep(1000); + } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED')) if (status != "SUCCESS") { logger.info("status is not success") } diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy index 51249d38bfc930..d832db8bec0889 100644 --- a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -60,7 +60,7 @@ suite("test_alter_job_mtmv") { SELECT * FROM ${tableName}; """ sql """alter MATERIALIZED VIEW ${mvName} refresh COMPLETE on commit; """ - waitingMTMVTaskFinishedByMvName(mvName) + waitingMTMVTaskFinishedByMvNameAllowCancel(mvName) sql """drop table if exists `${tableName}`""" sql """drop materialized view if exists ${mvName};""" From 5bc4bda1977f57cdbd88fc7bb507d9771eea0d64 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 13 Sep 2024 16:37:44 +0800 Subject: [PATCH 18/21] 1 --- regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy index d832db8bec0889..fa1618d5bf58f5 100644 --- a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -18,7 +18,7 @@ import org.junit.Assert; suite("test_alter_job_mtmv") { - String suiteName = "test_truncate_table_mtmv" + String suiteName = "test_alter_job_mtmv" String tableName = "${suiteName}_table" String mvName = "${suiteName}_mv" sql """drop table if exists `${tableName}`""" From 8c960441072096a4300a361b15014383c5cde60b Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Sat, 14 Sep 2024 12:00:33 +0800 Subject: [PATCH 19/21] 1 --- .../InsertOverwriteManager.java | 13 ++-- .../insert/InsertOverwriteTableCommand.java | 63 ++++++++----------- 2 files changed, 34 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index ff5d8ec59a4568..a00107c76a74a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -17,8 +17,10 @@ package org.apache.doris.insertoverwrite; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -283,15 +285,18 @@ private boolean rollback(long taskId) { * If the current table id has a running insert overwrite, throw an exception. * If not, record it in runningTables * - * @param dbId Run the dbId for insert overwrite - * @param tableId Run the tableId for insert overwrite + * @param db Run the db for insert overwrite + * @param table Run the table for insert overwrite */ - public void recordRunningTableOrException(long dbId, long tableId) { + public void recordRunningTableOrException(DatabaseIf db, TableIf table) { + long dbId = db.getId(); + long tableId = table.getId(); runningLock.writeLock().lock(); try { if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { throw new AnalysisException( - String.format("insert overwrite is running on db: %s, table: %s", dbId, tableId)); + String.format("Not allowed running Insert Overwrite on same table: %s.%s", db.getFullName(), + table.getName())); } if (runningTables.containsKey(dbId)) { runningTables.get(dbId).add(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 262fbe8d909f16..e829beebf134da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -61,11 +61,14 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.awaitility.Awaitility; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * insert into select command implementation @@ -82,8 +85,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private LogicalPlan logicalQuery; private Optional labelName; private final Optional cte; - private volatile boolean isCancelled = false; - private volatile boolean isRunning = false; + private AtomicBoolean isCancelled = new AtomicBoolean(false); + private AtomicBoolean isRunning = new AtomicBoolean(false); /** * constructor @@ -161,8 +164,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { partitionNames = new ArrayList<>(); } InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); - insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(), targetTable.getId()); - isRunning = true; + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), targetTable); + isRunning.set(true); long taskId = 0; try { if (isAutoDetectOverwrite()) { @@ -175,35 +178,35 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - if (isCancelled) { - LOG.info("insert overwrite isCancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); return; } taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); - if (isCancelled) { - LOG.info("insert overwrite isCancelled before addTempPartitions, queryId: {}", + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before addTempPartitions, queryId: {}", ctx.getQueryIdentifier()); // not need deal temp partition insertOverwriteManager.taskSuccess(taskId); return; } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); - if (isCancelled) { - LOG.info("insert overwrite isCancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); insertOverwriteManager.taskFail(taskId); return; } insertInto(ctx, executor, tempPartitionNames); - if (isCancelled) { - LOG.info("insert overwrite isCancelled before replacePartition, queryId: {}", + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before replacePartition, queryId: {}", ctx.getQueryIdentifier()); insertOverwriteManager.taskFail(taskId); return; } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - if (isCancelled) { - LOG.info("insert overwrite isCancelled before taskSuccess, do nothing, queryId: {}", + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before taskSuccess, do nothing, queryId: {}", ctx.getQueryIdentifier()); } insertOverwriteManager.taskSuccess(taskId); @@ -220,7 +223,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { ConnectContext.get().setSkipAuth(false); insertOverwriteManager .dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId()); - isRunning = false; + isRunning.set(false); } } @@ -228,35 +231,19 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { * cancel insert overwrite */ public void cancel() { - this.isCancelled = true; + this.isCancelled.set(true); } /** * wait insert overwrite not running */ public void waitNotRunning() { - long waitMaxTimeMills = 10 * 1000L; - long waitTime = 0; - while (true) { - if (!isRunning) { - break; - } - if (waitTime >= waitMaxTimeMills) { - LOG.warn("waiting time exceeds {} ms, stop wait, labelName: {}", waitMaxTimeMills, - labelName.isPresent() ? labelName.get() : ""); - break; - } - try { - Thread.sleep(100); - waitTime += 100; - } catch (InterruptedException e) { - LOG.warn(e); - break; - } - if (LOG.isDebugEnabled()) { - LOG.debug("insert overwrite waitNotRunning, labelName: {}", - labelName.isPresent() ? labelName.get() : ""); - } + long waitMaxTimeSecond = 10L; + try { + Awaitility.await().atMost(waitMaxTimeSecond, TimeUnit.SECONDS).untilFalse(isRunning); + } catch (Exception e) { + LOG.warn("waiting time exceeds {} second, stop wait, labelName: {}", waitMaxTimeSecond, + labelName.isPresent() ? labelName.get() : "", e); } } From e830f97e9fb9ca0dd7d990fcf0183e0f1b08b180 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Sat, 14 Sep 2024 12:34:19 +0800 Subject: [PATCH 20/21] 1 --- .../InsertOverwriteManagerTest.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java new file mode 100644 index 00000000000000..4bf6c9f12d564b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; + +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class InsertOverwriteManagerTest { + @Mocked + private DatabaseIf db; + + @Mocked + private TableIf table; + + @Before + public void setUp() + throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { + + new Expectations() { + { + db.getId(); + minTimes = 0; + result = 1L; + + db.getFullName(); + minTimes = 0; + result = "db1"; + + table.getId(); + minTimes = 0; + result = 2L; + + table.getName(); + minTimes = 0; + result = "table1"; + } + }; + } + + @Test + public void testParallel() { + InsertOverwriteManager manager = new InsertOverwriteManager(); + manager.recordRunningTableOrException(db, table); + try { + manager.recordRunningTableOrException(db, table); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Not allowed")); + } + manager.dropRunningRecord(db.getId(), table.getId()); + manager.recordRunningTableOrException(db, table); + } + +} From 041e0cba480a59fb690d3943017532d57c3f5682 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Sat, 14 Sep 2024 15:26:16 +0800 Subject: [PATCH 21/21] 1 --- .../plans/commands/insert/InsertOverwriteTableCommand.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index e829beebf134da..064fccaf521029 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -179,7 +179,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } else { List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); if (isCancelled.get()) { - LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", ctx.getQueryIdentifier()); + LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", + ctx.getQueryIdentifier()); return; } taskId = insertOverwriteManager