From ff6e3b36894546f50b9d96153ca4dae2ffe6526c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 30 Dec 2024 10:03:31 +0800 Subject: [PATCH 1/6] update --- .../segment_v2/vertical_segment_writer.cpp | 8 ++ be/src/olap/schema_change.cpp | 1 + .../apache/doris/alter/SchemaChangeJobV2.java | 13 +++ .../org/apache/doris/catalog/Partition.java | 12 ++ .../test_add_key_partial_update.out | 9 ++ .../test_add_key_partial_update.groovy | 110 ++++++++++++++++++ 6 files changed, 153 insertions(+) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 0846b0fc1186a8..61621d920eb45f 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -60,6 +60,7 @@ #include "service/point_query_executor.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/debug_points.h" #include "util/faststring.h" #include "util/key_util.h" #include "vec/columns/column_nullable.h" @@ -472,6 +473,8 @@ Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block) { + DBUG_EXECUTE_IF("_append_block_with_partial_content.block", DBUG_BLOCK); + RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos, false)); // create full block and fill with input columns full_block = _tablet_schema->create_block(); @@ -1340,6 +1343,11 @@ void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, string* encoded_ std::string VerticalSegmentWriter::_full_encode_keys( const std::vector& key_columns, size_t pos) { assert(_key_index_size.size() == _num_sort_key_columns); + if (!(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns)) { + LOG_INFO("key_columns.size()={}, _key_coders.size()={}, _num_sort_key_columns={}, ", + key_columns.size(), _key_coders.size(), _num_sort_key_columns); + } assert(key_columns.size() == _num_sort_key_columns && _key_coders.size() == _num_sort_key_columns); return _full_encode_keys(_key_coders, key_columns, pos); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 063e5e9af5ddc4..12a689f3718da4 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -805,6 +805,7 @@ Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { Status res = _do_process_alter_tablet(request); LOG(INFO) << "finished alter tablet process, res=" << res; + DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.block", DBUG_BLOCK); return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 54bcc35262760c..e0febe9ffcf2b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -48,6 +48,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; @@ -612,6 +613,18 @@ protected void runRunningJob() throws AlterCancelException { Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); + + if (DebugPointUtil.isEnable("SchemaChangeJobV2.runRunningJob.beforeTableLock.block")) { + while (DebugPointUtil.isEnable("SchemaChangeJobV2.runRunningJob.beforeTableLock.block")) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.info("error ", e); + } + } + LOG.info("debug point: leave SchemaChangeJobV2.runRunningJob.beforeTableLock.block"); + } + /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 83bb8a18cbd827..f388cf587e125e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -24,6 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; @@ -333,6 +334,17 @@ public boolean hasData() { * Also move it to idToVisibleRollupIndex if it is not the base index. */ public boolean visualiseShadowIndex(long shadowIndexId, boolean isBaseIndex) { + if (DebugPointUtil.isEnable("visualiseShadowIndex.block")) { + while (DebugPointUtil.isEnable("visualiseShadowIndex.block")) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.info("error ", e); + } + } + LOG.info("debug point: leave visualiseShadowIndex.block"); + } + MaterializedIndex shadowIdx = idToShadowIndex.remove(shadowIndexId); if (shadowIdx == null) { return false; diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out new file mode 100644 index 00000000000000..24e80ff54832b5 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy new file mode 100644 index 00000000000000..33e498fe02ccdc --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_add_key_partial_update", "nonConcurrent") { + + def table1 = "test_add_key_partial_update" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1),(2,2,2),(3,3,3);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // // block the schema change process before it change the shadow index to base index + // GetDebugPoint().enableDebugPointForAllFEs("visualiseShadowIndex.block") + // GetDebugPoint().enableDebugPointForAllFEs("SchemaChangeJobV2.runRunningJob.beforeTableLock.block") + // GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.block") + + // sql "alter table ${table1} ADD COLUMN k2 int key;" + + // def t1 = Thread.start { + // // wait util alter process finish on BE + // Thread.sleep(7000) + // GetDebugPoint().disableDebugPointForAllFEs("visualiseShadowIndex.block") + // } + + // Thread.sleep(1200) + // def t2 = Thread.start { + // sql "delete from ${table1} where k1<=3;" + // } + // Thread.sleep(1000) + // GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJobV2.runRunningJob.beforeTableLock.block") + // Thread.sleep(500) + // GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.block") + // t2.join() + + // t1.join() + // waitForSchemaChangeDone { + // sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + // time 1000 + // } + + // qt_sql "select * from ${table1} order by k1,k2;" + + + // block the schema change process before it change the shadow index to base index + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.block") + + sql "alter table ${table1} ADD COLUMN k2 int key;" + + def t1 = Thread.start { + // wait util alter process finish on BE + Thread.sleep(6000) + GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJob::process_alter_tablet.leave.block") + } + + Thread.sleep(1000) + sql "delete from ${table1} where k1<=3;" + + t1.join() + + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + qt_sql "select * from ${table1} order by k1,k2;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} From ff21106f695b577c96c9e6d0ac964ae04b5ce5c7 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 2 Jan 2025 20:09:33 +0800 Subject: [PATCH 2/6] re produce case --- be/src/olap/schema_change.cpp | 2 +- .../test_add_key_partial_update.out | 11 +++++++++++ .../test_add_key_partial_update.groovy | 18 ++++++++++-------- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 12a689f3718da4..4a737a1316e0cd 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -805,7 +805,7 @@ Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { Status res = _do_process_alter_tablet(request); LOG(INFO) << "finished alter tablet process, res=" << res; - DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.block", DBUG_BLOCK); + DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); }); return res; } diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out index 24e80ff54832b5..ffbfdce8b4794a 100644 --- a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -7,3 +7,14 @@ 5 5 5 6 6 6 +-- !sql -- +1 \N 1 1 2 0 +1 \N \N \N 6 1 +2 \N 2 2 2 0 +2 \N \N \N 6 1 +3 \N 3 3 2 0 +3 \N \N \N 6 1 +4 \N 4 4 5 0 +5 \N 5 5 5 0 +6 \N 6 6 5 0 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy index 33e498fe02ccdc..ace05596aeb42c 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -78,27 +78,29 @@ suite("test_add_key_partial_update", "nonConcurrent") { // block the schema change process before it change the shadow index to base index - GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.block") + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep") sql "alter table ${table1} ADD COLUMN k2 int key;" - def t1 = Thread.start { - // wait util alter process finish on BE - Thread.sleep(6000) - GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJob::process_alter_tablet.leave.block") - } + // def t1 = Thread.start { + // // wait util alter process finish on BE + // Thread.sleep(4000) + // GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJob::process_alter_tablet.leave.block") + // } Thread.sleep(1000) sql "delete from ${table1} where k1<=3;" - t1.join() + // t1.join() waitForSchemaChangeDone { sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ time 1000 } - qt_sql "select * from ${table1} order by k1,k2;" + sql "set skip_delete_sign=true;" + sql "sync;" + qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${table1} order by k1,k2,__DORIS_VERSION_COL__;" } catch(Exception e) { logger.info(e.getMessage()) From d2de7e009b0a93dda99f033fcdced06f86a31e5f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 3 Jan 2025 11:11:23 +0800 Subject: [PATCH 3/6] update --- .../test_add_key_partial_update.groovy | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy index ace05596aeb42c..eb7d951610288e 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -45,54 +45,14 @@ suite("test_add_key_partial_update", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() - // // block the schema change process before it change the shadow index to base index - // GetDebugPoint().enableDebugPointForAllFEs("visualiseShadowIndex.block") - // GetDebugPoint().enableDebugPointForAllFEs("SchemaChangeJobV2.runRunningJob.beforeTableLock.block") - // GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.block") - - // sql "alter table ${table1} ADD COLUMN k2 int key;" - - // def t1 = Thread.start { - // // wait util alter process finish on BE - // Thread.sleep(7000) - // GetDebugPoint().disableDebugPointForAllFEs("visualiseShadowIndex.block") - // } - - // Thread.sleep(1200) - // def t2 = Thread.start { - // sql "delete from ${table1} where k1<=3;" - // } - // Thread.sleep(1000) - // GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJobV2.runRunningJob.beforeTableLock.block") - // Thread.sleep(500) - // GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.block") - // t2.join() - - // t1.join() - // waitForSchemaChangeDone { - // sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ - // time 1000 - // } - - // qt_sql "select * from ${table1} order by k1,k2;" - - // block the schema change process before it change the shadow index to base index GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep") sql "alter table ${table1} ADD COLUMN k2 int key;" - // def t1 = Thread.start { - // // wait util alter process finish on BE - // Thread.sleep(4000) - // GetDebugPoint().disableDebugPointForAllFEs("SchemaChangeJob::process_alter_tablet.leave.block") - // } - Thread.sleep(1000) sql "delete from ${table1} where k1<=3;" - // t1.join() - waitForSchemaChangeDone { sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ time 1000 @@ -101,7 +61,6 @@ suite("test_add_key_partial_update", "nonConcurrent") { sql "set skip_delete_sign=true;" sql "sync;" qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${table1} order by k1,k2,__DORIS_VERSION_COL__;" - } catch(Exception e) { logger.info(e.getMessage()) throw e From 22a7a46b2150878396b3e47c18d16a1a5f0beee9 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 3 Jan 2025 11:29:31 +0800 Subject: [PATCH 4/6] add fix --- be/src/cloud/cloud_rowset_builder.cpp | 4 +-- be/src/olap/delta_writer_v2.cpp | 24 ++++++++-------- be/src/olap/delta_writer_v2.h | 6 ++-- be/src/olap/partial_update_info.cpp | 40 +++++++++++++++++++++++---- be/src/olap/partial_update_info.h | 11 ++++---- be/src/olap/rowset_builder.cpp | 18 ++++++------ be/src/olap/rowset_builder.h | 2 +- 7 files changed, 69 insertions(+), 36 deletions(-) diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 2e6764b33aa79c..9466dd1062803e 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() { duration_cast(system_clock::now().time_since_epoch()).count(); // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index a6fb0154489042..f098d94a389333 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -102,8 +102,8 @@ Status DeltaWriterV2::init() { if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_streams[0]->tablet_schema(_req.index_id)); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_streams[0]->tablet_schema(_req.index_id))); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -210,9 +210,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) { return Status::OK(); } -void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { _tablet_schema->copy_from(ori_tablet_schema); // find the right index id int i = 0; @@ -236,12 +236,14 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(), - table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), - table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + RETURN_IF_ERROR(_partial_update_info->init( + _req.tablet_id, _req.txn_id, *_tablet_schema, + table_schema_param->unique_key_update_mode(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn())); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index f9c2800a68f499..e4506ea0d2339d 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -85,9 +85,9 @@ class DeltaWriterV2 { Status cancel_with_status(const Status& st); private: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _update_profile(RuntimeProfile* profile); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index c0e83eab005e7e..e751e98b812997 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -20,6 +20,7 @@ #include #include "common/consts.h" +#include "common/logging.h" #include "olap/base_tablet.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" @@ -32,12 +33,13 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, - UniqueKeyUpdateModePB unique_key_update_mode, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, - int32_t sequence_map_col_uid, int64_t cur_max_version) { +Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + UniqueKeyUpdateModePB unique_key_update_mode, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, + const std::string& auto_increment_column, + int32_t sequence_map_col_uid, int64_t cur_max_version) { partial_update_mode = unique_key_update_mode; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; @@ -48,6 +50,31 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, missing_cids.clear(); update_cids.clear(); + // partial_update_cols should include all key columns + for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { + const auto key_col = tablet_schema.column(i); + if (!partial_update_cols.contains(key_col.name())) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Missing key column {}.", + tablet_id, txn_id, key_col.name()); + LOG_WARNING(msg); + return Status::Aborted(msg); + } + } + + // every including columns should be in tablet_schema + for (const auto& col : partial_update_cols) { + if (-1 == tablet_schema.field_index(col)) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Can't find column {} in tablet's schema.", + tablet_id, txn_id, col); + LOG_WARNING(msg); + return Status::Aborted(msg); + } + } + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { auto tablet_column = tablet_schema.column(i); @@ -75,6 +102,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, is_fixed_partial_update() && partial_update_input_columns.contains(auto_increment_column); _generate_default_values_for_missing_cids(tablet_schema); + return Status::OK(); } void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 1078a6eb0dddf4..f50937d014c57f 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -42,11 +42,12 @@ struct RowsetId; class BitmapValue; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, - const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1, - int64_t cur_max_version = -1); + Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + UniqueKeyUpdateModePB unique_key_update_mode, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, + const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1, + int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema, diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 1bbacc0b1236f6..ab4fa09f34c961 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -221,8 +221,8 @@ Status RowsetBuilder::init() { }; }) // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -396,9 +396,9 @@ Status BaseRowsetBuilder::cancel() { return Status::OK(); } -void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status BaseRowsetBuilder::_build_current_tablet_schema( + int64_t index_id, const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { // find the right index id int i = 0; auto indexes = table_schema_param->indexes(); @@ -438,13 +438,15 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init( - *_tablet_schema, table_schema_param->unique_key_update_mode(), + RETURN_IF_ERROR(_partial_update_info->init( + tablet()->tablet_id(), _req.txn_id, *_tablet_schema, + table_schema_param->unique_key_update_mode(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), table_schema_param->auto_increment_coulumn(), - table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase); + table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase)); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index fb2294d1770cc4..bc69277390f0d4 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -84,7 +84,7 @@ class BaseRowsetBuilder { Status init_mow_context(std::shared_ptr& mow_context); protected: - void _build_current_tablet_schema(int64_t index_id, + Status _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); From fba14f290c257cfe2362b1b3160fbe9b9b076bee Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 3 Jan 2025 11:50:04 +0800 Subject: [PATCH 5/6] update --- be/src/olap/partial_update_info.cpp | 34 +++++++------------ .../test_add_key_partial_update.out | 3 -- .../test_add_key_partial_update.groovy | 5 ++- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index e751e98b812997..1ca269a0f317b7 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -50,28 +50,18 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc missing_cids.clear(); update_cids.clear(); - // partial_update_cols should include all key columns - for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { - const auto key_col = tablet_schema.column(i); - if (!partial_update_cols.contains(key_col.name())) { - auto msg = fmt::format( - "Unable to do partial update on shadow index's tablet, tablet_id={}, " - "txn_id={}. Missing key column {}.", - tablet_id, txn_id, key_col.name()); - LOG_WARNING(msg); - return Status::Aborted(msg); - } - } - - // every including columns should be in tablet_schema - for (const auto& col : partial_update_cols) { - if (-1 == tablet_schema.field_index(col)) { - auto msg = fmt::format( - "Unable to do partial update on shadow index's tablet, tablet_id={}, " - "txn_id={}. Can't find column {} in tablet's schema.", - tablet_id, txn_id, col); - LOG_WARNING(msg); - return Status::Aborted(msg); + if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { + // partial_update_cols should include all key columns + for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { + const auto key_col = tablet_schema.column(i); + if (!partial_update_cols.contains(key_col.name())) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Missing key column {}.", + tablet_id, txn_id, key_col.name()); + LOG_WARNING(msg); + return Status::Aborted(msg); + } } } diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out index ffbfdce8b4794a..87d44dac59e4f4 100644 --- a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -9,11 +9,8 @@ -- !sql -- 1 \N 1 1 2 0 -1 \N \N \N 6 1 2 \N 2 2 2 0 -2 \N \N \N 6 1 3 \N 3 3 2 0 -3 \N \N \N 6 1 4 \N 4 4 5 0 5 \N 5 5 5 0 6 \N 6 6 5 0 diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy index eb7d951610288e..61ba9d60ea8371 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -51,7 +51,10 @@ suite("test_add_key_partial_update", "nonConcurrent") { sql "alter table ${table1} ADD COLUMN k2 int key;" Thread.sleep(1000) - sql "delete from ${table1} where k1<=3;" + test { + sql "delete from ${table1} where k1<=3;" + exception "Unable to do partial update on shadow index's tablet" + } waitForSchemaChangeDone { sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ From 1a307ef27f3965acea78bd739f0d68d78af82e7c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 3 Jan 2025 11:55:50 +0800 Subject: [PATCH 6/6] remove --- be/src/olap/rowset_builder.h | 4 ++-- .../org/apache/doris/alter/SchemaChangeJobV2.java | 12 ------------ .../java/org/apache/doris/catalog/Partition.java | 12 ------------ 3 files changed, 2 insertions(+), 26 deletions(-) diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index bc69277390f0d4..d87e2a9efa4a8f 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -85,8 +85,8 @@ class BaseRowsetBuilder { protected: Status _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); virtual void _init_profile(RuntimeProfile* profile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index e0febe9ffcf2b9..f4e2b93d603da9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -48,7 +48,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; -import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; @@ -614,17 +613,6 @@ protected void runRunningJob() throws AlterCancelException { Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); - if (DebugPointUtil.isEnable("SchemaChangeJobV2.runRunningJob.beforeTableLock.block")) { - while (DebugPointUtil.isEnable("SchemaChangeJobV2.runRunningJob.beforeTableLock.block")) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.info("error ", e); - } - } - LOG.info("debug point: leave SchemaChangeJobV2.runRunningJob.beforeTableLock.block"); - } - /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index f388cf587e125e..83bb8a18cbd827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -24,7 +24,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; @@ -334,17 +333,6 @@ public boolean hasData() { * Also move it to idToVisibleRollupIndex if it is not the base index. */ public boolean visualiseShadowIndex(long shadowIndexId, boolean isBaseIndex) { - if (DebugPointUtil.isEnable("visualiseShadowIndex.block")) { - while (DebugPointUtil.isEnable("visualiseShadowIndex.block")) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.info("error ", e); - } - } - LOG.info("debug point: leave visualiseShadowIndex.block"); - } - MaterializedIndex shadowIdx = idToShadowIndex.remove(shadowIndexId); if (shadowIdx == null) { return false;