diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index aaa35a0c2a949a..cb84134658d262 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -102,8 +102,8 @@ Status DeltaWriterV2::init() { if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_streams[0]->tablet_schema(_req.index_id)); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_streams[0]->tablet_schema(_req.index_id))); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -209,9 +209,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) { return Status::OK(); } -void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { _tablet_schema->copy_from(ori_tablet_schema); // find the right index id int i = 0; @@ -235,12 +235,13 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), - table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + RETURN_IF_ERROR(_partial_update_info->init( + _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn())); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 0ef564be393762..418ad098e57ae4 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -85,9 +85,9 @@ class DeltaWriterV2 { Status cancel_with_status(const Status& st); private: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _update_profile(RuntimeProfile* profile); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index c75e6c554ea11c..aa27f6b680c668 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -27,11 +27,11 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, - int64_t cur_max_version) { +Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; @@ -40,6 +40,22 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd this->timezone = timezone; missing_cids.clear(); update_cids.clear(); + + if (is_partial_update) { + // partial_update_cols should include all key columns + for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { + const auto key_col = tablet_schema.column(i); + if (!partial_update_cols.contains(key_col.name())) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Missing key column {}.", + tablet_id, txn_id, key_col.name()); + LOG_WARNING(msg); + return Status::Aborted(msg); + } + } + } + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { auto tablet_column = tablet_schema.column(i); if (!partial_update_input_columns.contains(tablet_column.name())) { @@ -59,6 +75,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd is_input_columns_contains_auto_inc_column = is_partial_update && partial_update_input_columns.contains(auto_increment_column); _generate_default_values_for_missing_cids(tablet_schema); + return Status::OK(); } void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 2d94361f503d79..e63e92cd19b8ce 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -26,10 +26,11 @@ class TabletSchema; class PartialUpdateInfoPB; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, - const std::string& auto_increment_column, int64_t cur_max_version = -1); + Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, const std::string& auto_increment_column, + int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); std::string summary() const; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 1be1f1ec180bda..348dec4835d779 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -54,6 +54,7 @@ #include "service/point_query_executor.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/debug_points.h" #include "util/faststring.h" #include "util/key_util.h" #include "vec/columns/column_nullable.h" @@ -346,10 +347,6 @@ Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block) { - if constexpr (!std::is_same_v) { - // TODO(plat1ko): CloudStorageEngine - return Status::NotSupported("append_block_with_partial_content"); - } RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos)); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index c668df4bd33141..e9da68bf9cc2e5 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -214,8 +214,8 @@ Status RowsetBuilder::init() { }; }) // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -377,9 +377,9 @@ Status BaseRowsetBuilder::cancel() { return Status::OK(); } -void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status BaseRowsetBuilder::_build_current_tablet_schema( + int64_t index_id, const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { // find the right index id int i = 0; auto indexes = table_schema_param->indexes(); @@ -419,12 +419,14 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init( - *_tablet_schema, table_schema_param->is_partial_update(), + RETURN_IF_ERROR(_partial_update_info->init( + tablet()->tablet_id(), _req.txn_id, *_tablet_schema, + table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase); + table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase)); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 362e976da71976..3e29ee7f5e9f65 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -85,9 +85,9 @@ class BaseRowsetBuilder { } protected: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _init_profile(RuntimeProfile* profile); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index b5cc293bdff387..49c3cc8e188081 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -723,6 +723,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req Status res = _do_process_alter_tablet_v2(request); LOG(INFO) << "finished alter tablet process, res=" << res; + DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); }); return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index e98d8066a1135c..3e945e06e8b8e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -537,6 +537,7 @@ protected void runRunningJob() throws AlterCancelException { Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); + /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out new file mode 100644 index 00000000000000..87d44dac59e4f4 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 + +-- !sql -- +1 \N 1 1 2 0 +2 \N 2 2 2 0 +3 \N 3 3 2 0 +4 \N 4 4 5 0 +5 \N 5 5 5 0 +6 \N 6 6 5 0 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy new file mode 100644 index 00000000000000..61ba9d60ea8371 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_add_key_partial_update", "nonConcurrent") { + + def table1 = "test_add_key_partial_update" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1),(2,2,2),(3,3,3);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the schema change process before it change the shadow index to base index + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep") + + sql "alter table ${table1} ADD COLUMN k2 int key;" + + Thread.sleep(1000) + test { + sql "delete from ${table1} where k1<=3;" + exception "Unable to do partial update on shadow index's tablet" + } + + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + sql "set skip_delete_sign=true;" + sql "sync;" + qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${table1} order by k1,k2,__DORIS_VERSION_COL__;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +}