From 3792710b75073f23645e2a60d5227777973358e1 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 10 Apr 2025 17:43:24 +0800 Subject: [PATCH] [enhance](mtmv)Only restrict MTMV to not allow concurrent insert overwrite execution (#48673) ### What problem does this PR solve? Related PR: #40558 Problem Summary: Previously, on #40558, concurrent execution of `INSERT OVERWRITE` was restricted for all OLAP tables to address potential issues caused by concurrent execution. In this current PR, the restriction has been modified to only apply to Materialized Views (MTMV), as it appears that users can tolerate the uncertainty in execution results caused by concurrency for OLAP tables. ### Release note Only restrict MTMV to not allow concurrent insert overwrite execution --- .../InsertOverwriteManager.java | 3 +- .../insertoverwrite/InsertOverwriteUtil.java | 9 ++++- .../InsertOverwriteManagerTest.java | 30 +++++++++++++--- .../InsertOverwriteUtilTest.java | 35 +++++++++++++++++++ .../test_iot_auto_detect_concurrent.groovy | 14 +++----- 5 files changed, 74 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index df16b8f1be205c..cb01ff90339c12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; @@ -294,7 +295,7 @@ public void recordRunningTableOrException(DatabaseIf db, TableIf table) { // If executed in parallel, it may cause problems such as not being able to find temporary partitions. // But in terms of external table, we don't care the internal logic of execution, // so there's no need to keep records - if (!(table instanceof OlapTable)) { + if (!(table instanceof MTMV)) { return; } long dbId = db.getId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index a773f3a99bb234..66e01810c73bcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -98,9 +98,16 @@ public static void replacePartition(TableIf olapTable, List partitionNam * @return */ public static List generateTempPartitionNames(List partitionNames) { + long threadId = Thread.currentThread().getId(); + // Adding thread ID as a prefix is to avoid mutual interference + // when different threads perform insert overwrite on the same partition simultaneously. + // Even if the insert overwrite execution fails/cancels, + // the generated temporary partition will be deleted, + // so there will be no problem generating temporary partitions with the same name in a single thread + String prefix = "iot_temp_" + threadId + "_"; List tempPartitionNames = new ArrayList(partitionNames.size()); for (String partitionName : partitionNames) { - String tempPartitionName = "iot_temp_" + partitionName; + String tempPartitionName = prefix + partitionName; if (tempPartitionName.length() > 50) { tempPartitionName = tempPartitionName.substring(0, 30) + Math.abs(Objects.hash(tempPartitionName)) + "_" + System.currentTimeMillis(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java index 026f821352246e..607d79b38caeac 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java @@ -18,6 +18,7 @@ package org.apache.doris.insertoverwrite; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -40,6 +41,9 @@ public class InsertOverwriteManagerTest { @Mocked private HMSExternalTable hmsExternalTable; + @Mocked + private MTMV mtmv; + @Before public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { @@ -69,18 +73,26 @@ public void setUp() hmsExternalTable.getName(); minTimes = 0; result = "hmsTable"; + + mtmv.getId(); + minTimes = 0; + result = 4L; + + mtmv.getName(); + minTimes = 0; + result = "mtmv1"; } }; } @Test - public void testParallel() { + public void testMTMVParallel() { InsertOverwriteManager manager = new InsertOverwriteManager(); - manager.recordRunningTableOrException(db, table); + manager.recordRunningTableOrException(db, mtmv); Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class, - () -> manager.recordRunningTableOrException(db, table)); - manager.dropRunningRecord(db.getId(), table.getId()); - Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table)); + () -> manager.recordRunningTableOrException(db, mtmv)); + manager.dropRunningRecord(db.getId(), mtmv.getId()); + Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, mtmv)); } @Test @@ -90,4 +102,12 @@ public void testHmsTableParallel() { Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, hmsExternalTable)); manager.dropRunningRecord(db.getId(), hmsExternalTable.getId()); } + + @Test + public void testOlapTableParallel() { + InsertOverwriteManager manager = new InsertOverwriteManager(); + manager.recordRunningTableOrException(db, table); + Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table)); + manager.dropRunningRecord(db.getId(), table.getId()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java new file mode 100644 index 00000000000000..947e876c53c8b5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.List; + +public class InsertOverwriteUtilTest { + + @Test + public void testGenerateTempPartitionNames() { + String regex = "^iot_temp_[0-9]+_p1$"; + List res = InsertOverwriteUtil.generateTempPartitionNames(Lists.newArrayList("p1")); + String tempP1Name = res.get(0); + Assertions.assertTrue(tempP1Name.matches(regex)); + } +} diff --git a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy index 7a55d4af622441..96b285ea4cad2e 100644 --- a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy +++ b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy @@ -30,7 +30,6 @@ suite("test_iot_auto_detect_concurrent") { sql new File("""${context.file.parent}/ddl/test_iot_auto_detect_concurrent.sql""").text def success_status = true - def err_msg = "" def load_data = { range, offset, expect_success -> try { sql " use test_iot_auto_detect_concurrent; " @@ -45,7 +44,6 @@ suite("test_iot_auto_detect_concurrent") { success_status = false log.info("fails one") } - err_msg = e.getMessage() log.info("successfully catch the failed insert") return } @@ -107,14 +105,10 @@ suite("test_iot_auto_detect_concurrent") { thread6.join() thread7.join() // suppose result: Success to overwrite with a multiple of ten values - if (!success_status) { - // Not allowed running Insert Overwrite on same table - assertTrue(err_msg.contains('same table')) - } else { - // The execution was fast, resulting in no concurrent execution - qt_sql3 " select count(k0) from test_concurrent_write; " - qt_sql4 " select count(distinct k0) from test_concurrent_write; " - } + assertTrue(success_status) + qt_sql3 " select count(k0) from test_concurrent_write; " + qt_sql4 " select count(distinct k0) from test_concurrent_write; " + /// with drop partition concurrently success_status = true