From 85ed9af02dbfb633c6e6247fcf35887e62dd9a5c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 2 Sep 2024 19:35:08 +0800 Subject: [PATCH 1/6] remove restriction --- .../java/org/apache/doris/alter/SchemaChangeHandler.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 43857b2e898d40..694611cce4bab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -741,12 +741,6 @@ private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable ol } // end for handling other indices if (typeChanged && !lightSchemaChange) { - Optional autoIncCol = olapTable.getBaseSchema(true).stream() - .filter(col -> col.isAutoInc()).findFirst(); - if (autoIncCol.isPresent()) { - throw new DdlException("Can not modify column " + modColumn.getName() + " becasue table " - + olapTable.getName() + " has auto-increment column " + autoIncCol.get().getName()); - } /* * In new alter table process (AlterJobV2), any modified columns are treated as new columns. * But the modified columns' name does not changed. So in order to distinguish this, we will add From a68f6fde6d1d57acee485d3823d876bbd8aed93a Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 2 Sep 2024 19:57:26 +0800 Subject: [PATCH 2/6] fix --- be/src/olap/schema_change.cpp | 2 + ...auto_inc_schema_change_double_write.groovy | 97 +++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index f6cfdf3dbde8d2..d40a44fd710936 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1134,6 +1134,8 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc changer, sc_sorting, sc_directly, _local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); + DBUG_EXECUTE_IF("SchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK); + // c.Convert historical data bool have_failure_rowset = false; for (const auto& rs_reader : sc_params.ref_rowset_readers) { diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy new file mode 100644 index 00000000000000..57d3c9af40b491 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { + + def table1 = "test_auto_inc_schema_change_double_write" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` BIGINT NOT NULL AUTO_INCREMENT, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "false", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1"); """ + + sql """insert into ${table1} select number,number,number,number,number from numbers("number"="5000"); """ + sql "sync;" + qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" + + def run_test = {thread_num, rows, iters -> + def threads = [] + (1..thread_num).each { id1 -> + threads.add(Thread.start { + (1..iters).each { id2 -> + sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + } + }) + } + + threads.each { thread -> thread.join() } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + + AtomicBoolean stopped = new AtomicBoolean(false) + def t1 = Thread.start { + while (!stopped.get()) { + run_test(5, 1000, 3) + Thread.sleep(100) + } + } + + Thread.sleep(3000) + + sql "alter table ${table1} modify column c3 varchar(100) null;" + + Thread.sleep(3000); + + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1""" + time 20000 + } + + stopped.set(true) + t1.join() + + qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +} From ac36693d543d265dd3aeee672141f62d7fef99c6 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 2 Sep 2024 20:13:56 +0800 Subject: [PATCH 3/6] fix --- .../test_auto_inc_schema_change_double_write.out | 5 +++++ .../test_auto_inc_schema_change_double_write.groovy | 11 ++++------- 2 files changed, 9 insertions(+), 7 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out b/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out new file mode 100644 index 00000000000000..29f296cab8b8fe --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- + diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy index 57d3c9af40b491..0ea9736e2d415c 100644 --- a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy @@ -32,12 +32,11 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { )UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES ( - "enable_mow_light_delete" = "false", "disable_auto_compaction" = "true", "enable_unique_key_merge_on_write" = "true", "replication_num" = "1"); """ - sql """insert into ${table1} select number,number,number,number,number from numbers("number"="5000"); """ + sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="5000"); """ sql "sync;" qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" @@ -46,7 +45,7 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { (1..thread_num).each { id1 -> threads.add(Thread.start { (1..iters).each { id2 -> - sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="${rows}");""" } }) } @@ -62,8 +61,8 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { AtomicBoolean stopped = new AtomicBoolean(false) def t1 = Thread.start { while (!stopped.get()) { - run_test(5, 1000, 3) - Thread.sleep(100) + run_test(2, 500, 3) + Thread.sleep(200) } } @@ -92,6 +91,4 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() } - - sql "DROP TABLE IF EXISTS ${table1};" } From c684b1a84091a07ae54a88b60eb892f2a1889247 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 3 Sep 2024 09:02:16 +0800 Subject: [PATCH 4/6] fix --- .../schema_change_p0/test_schema_change_auto_inc.groovy | 6 ------ 1 file changed, 6 deletions(-) diff --git a/regression-test/suites/schema_change_p0/test_schema_change_auto_inc.groovy b/regression-test/suites/schema_change_p0/test_schema_change_auto_inc.groovy index eeae07bd247606..d249d3c5632f14 100644 --- a/regression-test/suites/schema_change_p0/test_schema_change_auto_inc.groovy +++ b/regression-test/suites/schema_change_p0/test_schema_change_auto_inc.groovy @@ -53,12 +53,6 @@ suite("test_schema_change_auto_inc") { exception "Can't modify the column[id2]'s auto-increment attribute." } - // schema change that invoke double write on a table which has auto-increment column is forbidden - test { - sql "alter table ${table1} modify column value VARCHAR(20) NOT NULL" - exception "Can not modify column value becasue table ${table1} has auto-increment column id" - } - sql """ insert into ${table1}(name, value, id2) values("A", 999, 1), ("B", 888, 2), ("C", 777, 3);""" qt_sql "select count(distinct id) from ${table1};" From bd110e8b972302f74355d47aef559f2c835431b9 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 3 Sep 2024 10:59:04 +0800 Subject: [PATCH 5/6] add cloud --- be/src/cloud/cloud_schema_change_job.cpp | 2 + ...st_auto_inc_schema_change_double_write.out | 4 + ...auto_inc_schema_change_double_write.groovy | 132 +++++++++++------- 3 files changed, 90 insertions(+), 48 deletions(-) diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 614202b02583f2..b7e3be93e853bb 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -252,6 +252,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam changer, sc_sorting, _cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK); + // 3. Convert historical data bool already_exist_any_version = false; for (const auto& rs_reader : sc_params.ref_rowset_readers) { diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out b/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out index 29f296cab8b8fe..d0405083b994b0 100644 --- a/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out +++ b/regression-test/data/fault_injection_p0/test_auto_inc_schema_change_double_write.out @@ -3,3 +3,7 @@ -- !sql -- +-- !sql -- + +-- !sql -- + diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy index 0ea9736e2d415c..d853f965f12c1c 100644 --- a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy @@ -20,27 +20,22 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + } + } + assertTrue(replicaNum > 0) + if (isCloudMode()) { + replicaNum = 1 + } - def table1 = "test_auto_inc_schema_change_double_write" - sql "DROP TABLE IF EXISTS ${table1} FORCE;" - sql """ CREATE TABLE IF NOT EXISTS ${table1} ( - `k1` BIGINT NOT NULL AUTO_INCREMENT, - `c1` int, - `c2` int, - `c3` int, - `c4` int - )UNIQUE KEY(k1) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( - "disable_auto_compaction" = "true", - "enable_unique_key_merge_on_write" = "true", - "replication_num" = "1"); """ - - sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="5000"); """ - sql "sync;" - qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" - - def run_test = {thread_num, rows, iters -> + def tableName = "test_auto_inc_schema_change_double_write" + def run_test = {table1, thread_num, rows, iters -> def threads = [] (1..thread_num).each { id1 -> threads.add(Thread.start { @@ -51,44 +46,85 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { } threads.each { thread -> thread.join() } + } + + def block_convert_historical_rowsets = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + } } - try { - GetDebugPoint().clearDebugPointsForAllFEs() - GetDebugPoint().clearDebugPointsForAllBEs() - GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") - - AtomicBoolean stopped = new AtomicBoolean(false) - def t1 = Thread.start { - while (!stopped.get()) { - run_test(2, 500, 3) - Thread.sleep(200) - } + def unblock = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") } + } + + for (def model : ["UNIQUE", "DUPLICATE"]) { + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() - Thread.sleep(3000) + def table1 = "${tableName}_${model}" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` BIGINT NOT NULL AUTO_INCREMENT, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )${model} KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "${replicaNum}"); """ - sql "alter table ${table1} modify column c3 varchar(100) null;" + sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="5000"); """ + sql "sync;" + qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" - Thread.sleep(3000); + block_convert_historical_rowsets() + + AtomicBoolean stopped = new AtomicBoolean(false) + def t1 = Thread.start { + while (!stopped.get()) { + run_test(table1, 2, 500, 3) + Thread.sleep(200) + } + } - GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + Thread.sleep(3000) - waitForSchemaChangeDone { - sql """SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1""" - time 20000 - } + sql "alter table ${table1} modify column c3 varchar(100) null;" + + Thread.sleep(3000); - stopped.set(true) - t1.join() + unblock() - qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" + def t2 = Thread.start { + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1""" + time 20000 + } + } + + Thread.sleep(5000); + stopped.set(true) + t1.join() + t2.join() - } catch(Exception e) { - logger.info(e.getMessage()) - throw e - } finally { - GetDebugPoint().clearDebugPointsForAllFEs() - GetDebugPoint().clearDebugPointsForAllBEs() + qt_sql "select count(*) from ${table1} group by k1 having count(*) > 1;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } } } From 7da99ad355efc1baa0914898620fb9bb4dc23929 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Sat, 21 Sep 2024 10:29:42 +0800 Subject: [PATCH 6/6] fix reache connection limit exception --- ...auto_inc_schema_change_double_write.groovy | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy index d853f965f12c1c..98a99f4fdf9561 100644 --- a/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_schema_change_double_write.groovy @@ -34,20 +34,6 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { replicaNum = 1 } - def tableName = "test_auto_inc_schema_change_double_write" - def run_test = {table1, thread_num, rows, iters -> - def threads = [] - (1..thread_num).each { id1 -> - threads.add(Thread.start { - (1..iters).each { id2 -> - sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="${rows}");""" - } - }) - } - - threads.each { thread -> thread.join() } - } - def block_convert_historical_rowsets = { if (isCloudMode()) { GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") @@ -68,7 +54,7 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { try { GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() - + def tableName = "test_auto_inc_schema_change_double_write" def table1 = "${tableName}_${model}" sql "DROP TABLE IF EXISTS ${table1} FORCE;" sql """ CREATE TABLE IF NOT EXISTS ${table1} ( @@ -90,11 +76,24 @@ suite("test_auto_inc_schema_change_double_write", "nonConcurrent") { block_convert_historical_rowsets() AtomicBoolean stopped = new AtomicBoolean(false) + + def iters = 3 + def rows = 500 + def thread_num = 4 def t1 = Thread.start { - while (!stopped.get()) { - run_test(table1, 2, 500, 3) - Thread.sleep(200) + def threads = [] + (1..thread_num).each { id1 -> + threads.add(Thread.start { + while (!stopped.get()) { + (1..iters).each { id2 -> + sql """insert into ${table1}(c1,c2,c3,c4) select number,number,number,number from numbers("number"="${rows}");""" + } + Thread.sleep(200) + } + }) } + + threads.each { thread -> thread.join() } } Thread.sleep(3000)